This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.0-preview
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0-preview by this
push:
new eb9935a [SPARK-29283][SQL] Error message is hidden when query from
JDBC, especially enabled adaptive execution
eb9935a is described below
commit eb9935ab5fed14c46e94e20563acea79ae001044
Author: lajin <[email protected]>
AuthorDate: Wed Oct 16 19:51:56 2019 -0700
[SPARK-29283][SQL] Error message is hidden when query from JDBC, especially
enabled adaptive execution
### What changes were proposed in this pull request?
When adaptive execution is enabled, the Spark users who connected from JDBC
always get adaptive execution error whatever the under root cause is. It's very
confused. We have to check the driver log to find out why.
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON
key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive
execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
For example, a job queried from JDBC failed due to HDFS missing block. User
still get the error message `Adaptive execution failed due to stage
materialization failures`.
The easiest way to reproduce is changing the code of
`AdaptiveSparkPlanExec`, to let it throws out an exception when it faces
`StageSuccess`.
```scala
case class AdaptiveSparkPlanExec(
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
// stage.resultOption = Some(res)
val ex = new SparkException("Wrapper Exception",
new IllegalArgumentException("Root cause is
IllegalArgumentException for Test"))
errors.append(
new SparkException(s"Failed to materialize query stage:
${stage.treeString}", ex))
case StageFailure(stage, ex) =>
errors.append(
new SparkException(s"Failed to materialize query stage:
${stage.treeString}", ex))
```
### Why are the changes needed?
To make the error message more user-friend and more useful for query from
JDBC.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually test query:
```shell
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key,
value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as
string);
CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2,
3, 4)), cast(substring(rand(), 3, 4) as string);
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (0.225 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS
SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2,
2)), cast(substring(rand(), 3, 4) as int);
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (0.043 seconds)
```
Before:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON
key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive
execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
After:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON
key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: java.lang.IllegalArgumentException: Root cause
is IllegalArgumentException for Test (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
Closes #25960 from LantaoJin/SPARK-29283.
Authored-by: lajin <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit fda4070ea934cac081162f70d9ea7fe2e9a07cd4)
Signed-off-by: Yuming Wang <[email protected]>
---
.../thriftserver/SparkExecuteStatementOperation.scala | 17 +++++++++++------
.../hive/thriftserver/SparkGetCatalogsOperation.scala | 18 ++++++++++++++----
.../hive/thriftserver/SparkGetColumnsOperation.scala | 18 ++++++++++++++----
.../thriftserver/SparkGetFunctionsOperation.scala | 18 ++++++++++++++----
.../hive/thriftserver/SparkGetSchemasOperation.scala | 18 ++++++++++++++----
.../thriftserver/SparkGetTableTypesOperation.scala | 18 ++++++++++++++----
.../hive/thriftserver/SparkGetTablesOperation.scala | 19 ++++++++++++++-----
.../hive/thriftserver/SparkGetTypeInfoOperation.scala | 18 ++++++++++++++----
8 files changed, 109 insertions(+), 35 deletions(-)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 90a428d..68197a9 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli._
@@ -312,12 +313,16 @@ private[hive] class SparkExecuteStatementOperation(
} else {
logError(s"Error executing query with $statementId, currentState
$currentState, ", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- if (e.isInstanceOf[HiveSQLException]) {
- throw e.asInstanceOf[HiveSQLException]
- } else {
- throw new HiveSQLException("Error running query: " + e.toString, e)
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error running query: " +
root.toString, root)
}
}
} finally {
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
index cde99fd..6c8a5b0 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetCatalogsOperation
@@ -68,11 +69,20 @@ private[hive] class SparkGetCatalogsOperation(
}
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get catalogs operation with $statementId",
e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting catalogs: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index 89faff2..f845a22 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters.seqAsJavaListConverter
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType,
HivePrivilegeObject}
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
import org.apache.hive.service.cli._
@@ -129,11 +130,20 @@ private[hive] class SparkGetColumnsOperation(
}
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get columns operation with $statementId", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting columns: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 462e5730..1cdd891 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -22,6 +22,7 @@ import java.util.UUID
import scala.collection.JavaConverters.seqAsJavaListConverter
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType,
HivePrivilegeObjectUtils}
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetFunctionsOperation
@@ -104,11 +105,20 @@ private[hive] class SparkGetFunctionsOperation(
}
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get functions operation with $statementId",
e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting functions: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
index 87ef154..928610a 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
import java.util.regex.Pattern
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetSchemasOperation
@@ -87,11 +88,20 @@ private[hive] class SparkGetSchemasOperation(
}
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get schemas operation with $statementId", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting schemas: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
index 8f2257f..ec03f1e 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTableTypesOperation
@@ -74,11 +75,20 @@ private[hive] class SparkGetTableTypesOperation(
}
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get table types operation with
$statementId", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting table types: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
index 6441dc5..bf9cf7a 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
import org.apache.hive.service.cli._
@@ -30,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{Utils => SparkUtils}
@@ -119,11 +119,20 @@ private[hive] class SparkGetTablesOperation(
}
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get tables operation with $statementId", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting tables: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
index 7a6a8c5..0d263b0 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
+import org.apache.commons.lang3.exception.ExceptionUtils
import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
@@ -92,11 +93,20 @@ private[hive] class SparkGetTypeInfoOperation(
})
setState(OperationState.FINISHED)
} catch {
- case e: HiveSQLException =>
+ case e: Throwable =>
+ logError(s"Error executing get type info with $statementId", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw e
+ e match {
+ case hiveException: HiveSQLException =>
+ HiveThriftServer2.listener.onStatementError(
+ statementId, hiveException.getMessage,
SparkUtils.exceptionString(hiveException))
+ throw hiveException
+ case _ =>
+ val root = ExceptionUtils.getRootCause(e)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, root.getMessage, SparkUtils.exceptionString(root))
+ throw new HiveSQLException("Error getting type info: " +
root.toString, root)
+ }
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]