This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.0-preview
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0-preview by this 
push:
     new eb9935a  [SPARK-29283][SQL] Error message is hidden when query from 
JDBC, especially enabled adaptive execution
eb9935a is described below

commit eb9935ab5fed14c46e94e20563acea79ae001044
Author: lajin <[email protected]>
AuthorDate: Wed Oct 16 19:51:56 2019 -0700

    [SPARK-29283][SQL] Error message is hidden when query from JDBC, especially 
enabled adaptive execution
    
    ### What changes were proposed in this pull request?
    When adaptive execution is enabled, the Spark users who connected from JDBC 
always get adaptive execution error whatever the under root cause is. It's very 
confused. We have to check the driver log to find out why.
    ```shell
    0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON 
key = v;
    SELECT * FROM testData join testData2 ON key = v;
    Error: Error running query: org.apache.spark.SparkException: Adaptive 
execution failed due to stage materialization failures. (state=,code=0)
    0: jdbc:hive2://localhost:10000>
    ```
    
    For example, a job queried from JDBC failed due to HDFS missing block. User 
still get the error message `Adaptive execution failed due to stage 
materialization failures`.
    
    The easiest way to reproduce is changing the code of 
`AdaptiveSparkPlanExec`, to let it throws out  an exception when it faces 
`StageSuccess`.
    ```scala
      case class AdaptiveSparkPlanExec(
          events.drainTo(rem)
             (Seq(nextMsg) ++ rem.asScala).foreach {
               case StageSuccess(stage, res) =>
    //            stage.resultOption = Some(res)
                val ex = new SparkException("Wrapper Exception",
                  new IllegalArgumentException("Root cause is 
IllegalArgumentException for Test"))
                errors.append(
                  new SparkException(s"Failed to materialize query stage: 
${stage.treeString}", ex))
               case StageFailure(stage, ex) =>
                 errors.append(
                   new SparkException(s"Failed to materialize query stage: 
${stage.treeString}", ex))
    ```
    
    ### Why are the changes needed?
    To make the error message more user-friend and more useful for query from 
JDBC.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Manually test query:
    ```shell
    0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, 
value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as 
string);
    CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 
3, 4)), cast(substring(rand(), 3, 4) as string);
    +---------+--+
    | Result  |
    +---------+--+
    +---------+--+
    No rows selected (0.225 seconds)
    0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS 
SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
    CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 
2)), cast(substring(rand(), 3, 4) as int);
    +---------+--+
    | Result  |
    +---------+--+
    +---------+--+
    No rows selected (0.043 seconds)
    ```
    Before:
    ```shell
    0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON 
key = v;
    SELECT * FROM testData join testData2 ON key = v;
    Error: Error running query: org.apache.spark.SparkException: Adaptive 
execution failed due to stage materialization failures. (state=,code=0)
    0: jdbc:hive2://localhost:10000>
    ```
    After:
    ```shell
    0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON 
key = v;
    SELECT * FROM testData join testData2 ON key = v;
    Error: Error running query: java.lang.IllegalArgumentException: Root cause 
is IllegalArgumentException for Test (state=,code=0)
    0: jdbc:hive2://localhost:10000>
    ```
    
    Closes #25960 from LantaoJin/SPARK-29283.
    
    Authored-by: lajin <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
    (cherry picked from commit fda4070ea934cac081162f70d9ea7fe2e9a07cd4)
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../thriftserver/SparkExecuteStatementOperation.scala | 17 +++++++++++------
 .../hive/thriftserver/SparkGetCatalogsOperation.scala | 18 ++++++++++++++----
 .../hive/thriftserver/SparkGetColumnsOperation.scala  | 18 ++++++++++++++----
 .../thriftserver/SparkGetFunctionsOperation.scala     | 18 ++++++++++++++----
 .../hive/thriftserver/SparkGetSchemasOperation.scala  | 18 ++++++++++++++----
 .../thriftserver/SparkGetTableTypesOperation.scala    | 18 ++++++++++++++----
 .../hive/thriftserver/SparkGetTablesOperation.scala   | 19 ++++++++++++++-----
 .../hive/thriftserver/SparkGetTypeInfoOperation.scala | 18 ++++++++++++++----
 8 files changed, 109 insertions(+), 35 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 90a428d..68197a9 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.shims.Utils
 import org.apache.hive.service.cli._
@@ -312,12 +313,16 @@ private[hive] class SparkExecuteStatementOperation(
         } else {
           logError(s"Error executing query with $statementId, currentState 
$currentState, ", e)
           setState(OperationState.ERROR)
-          HiveThriftServer2.listener.onStatementError(
-            statementId, e.getMessage, SparkUtils.exceptionString(e))
-          if (e.isInstanceOf[HiveSQLException]) {
-            throw e.asInstanceOf[HiveSQLException]
-          } else {
-            throw new HiveSQLException("Error running query: " + e.toString, e)
+          e match {
+            case hiveException: HiveSQLException =>
+              HiveThriftServer2.listener.onStatementError(
+                statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+              throw hiveException
+            case _ =>
+              val root = ExceptionUtils.getRootCause(e)
+              HiveThriftServer2.listener.onStatementError(
+                statementId, root.getMessage, SparkUtils.exceptionString(root))
+              throw new HiveSQLException("Error running query: " + 
root.toString, root)
           }
         }
     } finally {
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
index cde99fd..6c8a5b0 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.util.UUID
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import org.apache.hive.service.cli.{HiveSQLException, OperationState}
 import org.apache.hive.service.cli.operation.GetCatalogsOperation
@@ -68,11 +69,20 @@ private[hive] class SparkGetCatalogsOperation(
       }
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get catalogs operation with $statementId", 
e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting catalogs: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index 89faff2..f845a22 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
 
 import scala.collection.JavaConverters.seqAsJavaListConverter
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, 
HivePrivilegeObject}
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
 import org.apache.hive.service.cli._
@@ -129,11 +130,20 @@ private[hive] class SparkGetColumnsOperation(
       }
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get columns operation with $statementId", e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting columns: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 462e5730..1cdd891 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -22,6 +22,7 @@ import java.util.UUID
 
 import scala.collection.JavaConverters.seqAsJavaListConverter
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, 
HivePrivilegeObjectUtils}
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.GetFunctionsOperation
@@ -104,11 +105,20 @@ private[hive] class SparkGetFunctionsOperation(
       }
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get functions operation with $statementId", 
e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting functions: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
index 87ef154..928610a 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
 import java.util.UUID
 import java.util.regex.Pattern
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.GetSchemasOperation
@@ -87,11 +88,20 @@ private[hive] class SparkGetSchemasOperation(
       }
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get schemas operation with $statementId", e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting schemas: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
index 8f2257f..ec03f1e 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.util.UUID
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.GetTableTypesOperation
@@ -74,11 +75,20 @@ private[hive] class SparkGetTableTypesOperation(
       }
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get table types operation with 
$statementId", e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting table types: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
index 6441dc5..bf9cf7a 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
 import org.apache.hive.service.cli._
@@ -30,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.util.{Utils => SparkUtils}
@@ -119,11 +119,20 @@ private[hive] class SparkGetTablesOperation(
       }
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get tables operation with $statementId", e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting tables: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
index 7a6a8c5..0d263b0 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.util.UUID
 
+import org.apache.commons.lang3.exception.ExceptionUtils
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import org.apache.hive.service.cli.{HiveSQLException, OperationState}
 import org.apache.hive.service.cli.operation.GetTypeInfoOperation
@@ -92,11 +93,20 @@ private[hive] class SparkGetTypeInfoOperation(
       })
       setState(OperationState.FINISHED)
     } catch {
-      case e: HiveSQLException =>
+      case e: Throwable =>
+        logError(s"Error executing get type info with $statementId", e)
         setState(OperationState.ERROR)
-        HiveThriftServer2.listener.onStatementError(
-          statementId, e.getMessage, SparkUtils.exceptionString(e))
-        throw e
+        e match {
+          case hiveException: HiveSQLException =>
+            HiveThriftServer2.listener.onStatementError(
+              statementId, hiveException.getMessage, 
SparkUtils.exceptionString(hiveException))
+            throw hiveException
+          case _ =>
+            val root = ExceptionUtils.getRootCause(e)
+            HiveThriftServer2.listener.onStatementError(
+              statementId, root.getMessage, SparkUtils.exceptionString(root))
+            throw new HiveSQLException("Error getting type info: " + 
root.toString, root)
+        }
     }
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to