This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 865d86562 [KYUUBI #4711] JDBC client should catch task failed
exception instead of NPE in the incremental mode
865d86562 is described below
commit 865d865629740d68150647e767316a3869c24ad4
Author: Fu Chen <[email protected]>
AuthorDate: Fri Apr 14 20:46:28 2023 +0800
[KYUUBI #4711] JDBC client should catch task failed exception instead of
NPE in the incremental mode
### _Why are the changes needed?_
Since the job was lazily submitted in the incremental mode, the engine
should not catch the task failed exception even though the operation is in the
terminal state.
Before this PR:
```
0: jdbc:hive2://0.0.0.0:10009/> set
kyuubi.operation.incremental.collect=true;
+---------------------------------------+--------+
| key | value |
+---------------------------------------+--------+
| kyuubi.operation.incremental.collect | true |
+---------------------------------------+--------+
0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message');
Error: (state=,code=0)
0: jdbc:hive2://0.0.0.0:10009/>
```
kyuubi server log
```
2023-04-14 18:47:50.185 ERROR
org.apache.kyuubi.server.KyuubiTBinaryFrontendService: Error fetching results:
java.lang.NullPointerException: null
at
org.apache.kyuubi.server.BackendServiceMetric.$anonfun$fetchResults$1(BackendServiceMetric.scala:191)
~[classes/:?]
at
org.apache.kyuubi.metrics.MetricsSystem$.timerTracing(MetricsSystem.scala:111)
~[classes/:?]
at
org.apache.kyuubi.server.BackendServiceMetric.fetchResults(BackendServiceMetric.scala:187)
~[classes/:?]
at
org.apache.kyuubi.server.BackendServiceMetric.fetchResults$(BackendServiceMetric.scala:182)
~[classes/:?]
at
org.apache.kyuubi.server.KyuubiServer$$anon$1.fetchResults(KyuubiServer.scala:147)
~[classes/:?]
at
org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530)
[classes/:?]
```
After this PR:
```
0: jdbc:hive2://0.0.0.0:10009/> set
kyuubi.operation.incremental.collect=true;
+---------------------------------------+--------+
| key | value |
+---------------------------------------+--------+
| kyuubi.operation.incremental.collect | true |
+---------------------------------------+--------+
0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message');
Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage
3.0 (TID 3) (0.0.0.0 executor driver): java.lang.RuntimeException: custom error
message
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.rdd.RDD.collectPartition$1(RDD.scala:1036)
at
org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3(RDD.scala:1038)
at
org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3$adapted(RDD.scala:1038)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.kyuubi.operation.IterableFetchIterator.hasNext(FetchIterator.scala:97)
at
scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
at scala.collection.Iterator.toStream(Iterator.scala:1417)
at scala.collection.Iterator.toStream$(Iterator.scala:1416)
at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
at
scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
at
org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$getNextRowSet$1(SparkOperation.scala:265)
at
org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
at
org.apache.kyuubi.engine.spark.operation.SparkOperation.getNextRowSet(SparkOperation.scala:243)
at
org.apache.kyuubi.operation.OperationManager.getOperationNextRowSet(OperationManager.scala:141)
at
org.apache.kyuubi.session.AbstractSession.fetchResults(AbstractSession.scala:240)
at
org.apache.kyuubi.service.AbstractBackendService.fetchResults(AbstractBackendService.scala:214)
at
org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530)
at
org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837)
at
org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822)
at
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at
org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: custom error message
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
... 3 more (state=,code=0)
0: jdbc:hive2://0.0.0.0:10009/>
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4711 from cfmcgrady/incremental-show-error-msg.
Closes #4711
66bb527ce [Fu Chen] JDBC client should catch task failed exception in the
incremental mode
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit db46b5b320ffc3e58f84a0c3bb0d113783b9612b)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/spark/operation/SparkOperation.scala | 5 +++--
.../kyuubi/operation/KyuubiOperationPerConnectionSuite.scala | 12 +++++++++++-
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index eb58407d4..cb7510a89 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -181,8 +181,9 @@ abstract class SparkOperation(session: Session)
setOperationException(ke)
throw ke
} else if (isTerminalState(state)) {
- setOperationException(KyuubiSQLException(errMsg))
- warn(s"Ignore exception in terminal state with $statementId:
$errMsg")
+ val ke = KyuubiSQLException(errMsg)
+ setOperationException(ke)
+ throw ke
} else {
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 669475b6c..6c22664fe 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.config.{KyuubiConf,
KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
-import org.apache.kyuubi.jdbc.hive.KyuubiConnection
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.plugin.SessionConfAdvisor
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
@@ -280,6 +280,16 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
assert(rs.getString(2) === KYUUBI_VERSION)
}
}
+
+ test("JDBC client should catch task failed exception in the incremental
mode") {
+ withJdbcStatement() { statement =>
+ statement.executeQuery(s"set
${KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key}=true;")
+ val resultSet = statement.executeQuery(
+ "SELECT raise_error('client should catch this exception');")
+ val e = intercept[KyuubiSQLException](resultSet.next())
+ assert(e.getMessage.contains("client should catch this exception"))
+ }
+ }
}
class TestSessionConfAdvisor extends SessionConfAdvisor {