This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 865d86562 [KYUUBI #4711] JDBC client should catch task failed 
exception instead of NPE in the incremental mode
865d86562 is described below

commit 865d865629740d68150647e767316a3869c24ad4
Author: Fu Chen <[email protected]>
AuthorDate: Fri Apr 14 20:46:28 2023 +0800

    [KYUUBI #4711] JDBC client should catch task failed exception instead of 
NPE in the incremental mode
    
    ### _Why are the changes needed?_
    
    Since the job was lazily submitted in the incremental mode, the engine 
should not catch the task failed exception even though the operation is in the 
terminal state.
    
    Before this PR:
    
    ```
    0: jdbc:hive2://0.0.0.0:10009/> set 
kyuubi.operation.incremental.collect=true;
    +---------------------------------------+--------+
    |                  key                  | value  |
    +---------------------------------------+--------+
    | kyuubi.operation.incremental.collect  | true   |
    +---------------------------------------+--------+
    0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message');
    Error:  (state=,code=0)
    0: jdbc:hive2://0.0.0.0:10009/>
    ```
    
    kyuubi server log
    
    ```
    2023-04-14 18:47:50.185 ERROR 
org.apache.kyuubi.server.KyuubiTBinaryFrontendService: Error fetching results:
    java.lang.NullPointerException: null
            at 
org.apache.kyuubi.server.BackendServiceMetric.$anonfun$fetchResults$1(BackendServiceMetric.scala:191)
 ~[classes/:?]
            at 
org.apache.kyuubi.metrics.MetricsSystem$.timerTracing(MetricsSystem.scala:111) 
~[classes/:?]
            at 
org.apache.kyuubi.server.BackendServiceMetric.fetchResults(BackendServiceMetric.scala:187)
 ~[classes/:?]
            at 
org.apache.kyuubi.server.BackendServiceMetric.fetchResults$(BackendServiceMetric.scala:182)
 ~[classes/:?]
            at 
org.apache.kyuubi.server.KyuubiServer$$anon$1.fetchResults(KyuubiServer.scala:147)
 ~[classes/:?]
            at 
org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530)
 [classes/:?]
    ```
    
    After this PR:
    
    ```
    0: jdbc:hive2://0.0.0.0:10009/> set 
kyuubi.operation.incremental.collect=true;
    +---------------------------------------+--------+
    |                  key                  | value  |
    +---------------------------------------+--------+
    | kyuubi.operation.incremental.collect  | true   |
    +---------------------------------------+--------+
    0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message');
    Error: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
3.0 (TID 3) (0.0.0.0 executor driver): java.lang.RuntimeException: custom error 
message
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
            at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
            at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
            at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:136)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
            at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
            at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
            at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
            at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
            at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
            at scala.Option.foreach(Option.scala:407)
            at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
            at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
            at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
            at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
            at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
            at org.apache.spark.rdd.RDD.collectPartition$1(RDD.scala:1036)
            at 
org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3(RDD.scala:1038)
            at 
org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3$adapted(RDD.scala:1038)
            at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at 
org.apache.kyuubi.operation.IterableFetchIterator.hasNext(FetchIterator.scala:97)
            at 
scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
            at scala.collection.Iterator.toStream(Iterator.scala:1417)
            at scala.collection.Iterator.toStream$(Iterator.scala:1416)
            at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
            at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
            at 
scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
            at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
            at 
org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$getNextRowSet$1(SparkOperation.scala:265)
            at 
org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
            at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
            at 
org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
            at 
org.apache.kyuubi.engine.spark.operation.SparkOperation.getNextRowSet(SparkOperation.scala:243)
            at 
org.apache.kyuubi.operation.OperationManager.getOperationNextRowSet(OperationManager.scala:141)
            at 
org.apache.kyuubi.session.AbstractSession.fetchResults(AbstractSession.scala:240)
            at 
org.apache.kyuubi.service.AbstractBackendService.fetchResults(AbstractBackendService.scala:214)
            at 
org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530)
            at 
org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837)
            at 
org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822)
            at 
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
            at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
            at 
org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
            at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.RuntimeException: custom error message
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
            at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
            at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
            at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:136)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
            ... 3 more (state=,code=0)
    0: jdbc:hive2://0.0.0.0:10009/>
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4711 from cfmcgrady/incremental-show-error-msg.
    
    Closes #4711
    
    66bb527ce [Fu Chen] JDBC client should catch task failed exception in the 
incremental mode
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit db46b5b320ffc3e58f84a0c3bb0d113783b9612b)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/spark/operation/SparkOperation.scala       |  5 +++--
 .../kyuubi/operation/KyuubiOperationPerConnectionSuite.scala | 12 +++++++++++-
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index eb58407d4..cb7510a89 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -181,8 +181,9 @@ abstract class SparkOperation(session: Session)
           setOperationException(ke)
           throw ke
         } else if (isTerminalState(state)) {
-          setOperationException(KyuubiSQLException(errMsg))
-          warn(s"Ignore exception in terminal state with $statementId: 
$errMsg")
+          val ke = KyuubiSQLException(errMsg)
+          setOperationException(ke)
+          throw ke
         } else {
           error(s"Error operating $opType: $errMsg", e)
           val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 669475b6c..6c22664fe 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.config.{KyuubiConf, 
KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
 import org.apache.kyuubi.engine.ApplicationState
 import org.apache.kyuubi.jdbc.KyuubiHiveDriver
-import org.apache.kyuubi.jdbc.hive.KyuubiConnection
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.plugin.SessionConfAdvisor
 import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
@@ -280,6 +280,16 @@ class KyuubiOperationPerConnectionSuite extends 
WithKyuubiServer with HiveJDBCTe
       assert(rs.getString(2) === KYUUBI_VERSION)
     }
   }
+
+  test("JDBC client should catch task failed exception in the incremental 
mode") {
+    withJdbcStatement() { statement =>
+      statement.executeQuery(s"set 
${KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key}=true;")
+      val resultSet = statement.executeQuery(
+        "SELECT raise_error('client should catch this exception');")
+      val e = intercept[KyuubiSQLException](resultSet.next())
+      assert(e.getMessage.contains("client should catch this exception"))
+    }
+  }
 }
 
 class TestSessionConfAdvisor extends SessionConfAdvisor {

Reply via email to