This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new c1ceb6b98 [KYUUBI #5531][TEST] Fix flaky FlinkOperationOnYarnSuite by 
enlarging the max rows setting
c1ceb6b98 is described below

commit c1ceb6b9851d9b5712d10b2e6105d994a18677a3
Author: davidyuan <[email protected]>
AuthorDate: Mon Oct 30 13:12:43 2023 +0800

    [KYUUBI #5531][TEST] Fix flaky FlinkOperationOnYarnSuite by enlarging the 
max rows setting
    
    ### _Why are the changes needed?_
    
    #### 1. know about this pr
    When we execute flink(1.17+) test case, it may throw exception when the 
test case is `show/stop job`, the exception desc like this
    ```
    - execute statement - show/stop jobs *** FAILED ***
      org.apache.kyuubi.jdbc.hive.KyuubiSQLException: Error operating 
ExecuteStatement: 
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not 
stop job 4dece26857fab91d63fad1abd8c6bdd0 with savepoint for operation 
9ed8247a-b7bd-4004-875b-61ba654ab3dd.
            at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:628)
            at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:716)
            at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:601)
            at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:434)
            at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
            at 
org.apache.kyuubi.engine.flink.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:64)
            at 
org.apache.kyuubi.engine.flink.operation.ExecuteStatement.runInternal(ExecuteStatement.scala:56)
            at 
org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171)
            at 
org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:101)
            at 
org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:131)
            at 
org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:82)
            at 
org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:128)
            at 
org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:67)
            at 
org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:252)
            at 
org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
            at 
org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
            at 
org.apache.kyuubi.shade.org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
            at 
org.apache.kyuubi.shade.org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
            at 
org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
            at 
org.apache.kyuubi.shade.org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1[149](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:150))
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering 
task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 
4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting 
checkpoint. Failure reason: Not all required tasks are currently running.
            at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
            at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
            at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$11(OperationExecutor.java:617)
            ... 22 more
    Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering 
task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 
4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting 
checkpoint. Failure reason: Not all required tasks are currently running.
            at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
            at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
            at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
            at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
            at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
            at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
            at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
            at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1298)
            at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
            at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
            at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
            at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
            at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
            at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
            at akka.dispatch.OnComplete.internal(Future.scala:299)
            at akka.dispatch.OnComplete.internal(Future.scala:297)
            at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
            at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
      at 
org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:171)
      ...
      Cause: java.lang.RuntimeException: 
org.apache.flink.util.SerializedThrowable:java.util.concurrent.CompletionException:
 org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering 
task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 
4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting 
checkpoint. Failure reason: Not all required tasks are currently running.
      at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
      at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
      at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
      at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
      at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
      at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
      at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
      at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
      at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      ...
      Cause: java.lang.RuntimeException: 
org.apache.flink.util.SerializedThrowable:org.apache.flink.runtime.checkpoint.CheckpointException:
 Checkpoint triggering task Source: tbl_a[6] -> Sink: tbl_b[7] (1/1) of job 
4dece26857fab91d63fad1abd8c6bdd0 is not being executed at the moment. Aborting 
checkpoint. Failure reason: Not all required tasks are currently running.
      at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
      at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
      at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:[160](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:161)4)
      at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
      at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
      at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
      at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
      at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
      at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:[168](https://github.com/apache/kyuubi/actions/runs/6649714451/job/18068699087?pr=5501#step:8:169))
      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
      ...
    ```
    #### 2. what make the test case failed?
    If we want know the reason about the exception, we need to understand the 
process of flink executing stop job, the process line like this code space 
show(it's source is our bad test case, we can use this test case to solve 
similar problems)
    ```
    1. sql
       1.1 create table tbl_a (a int) with ('connector' = 
'datagen','rows-per-second'='10')
       1.2 create table tbl_b (a int) with ('connector' = 'blackhole')
       1.3 insert into tbl_b select * from tbl_a
    2. start job: it will get 2 tasks abount source sink
    3. show job: we can get job info
    4. stop job(the main error):
       4.1 stop job need checkpoint
       4.2 start checkpoint, it need all task state is running
       4.3 checkpoint can not get all task state is running, then throw the 
exception
    ```
    Actually, in a normal process, it should not throw the exception, if this 
happens to your job, please check your kyuubi conf 
`kyuubi.session.engine.flink.max.rows`, it's default value is 1000000. But in 
the test case, we only the the conf's value is 10.
    It's the reason to make the error, this conf makes when we execute a stream 
query, it will cancel the when the limit is reached. Because flink's datagen is 
a streamconnector, so we can imagine, when we execute those sql, because our 
conf, it will make the sink task be canceled because the query reached 10. So 
when we execute stop job, flink checkpoint cannot get the tasks about this job 
is all in running state, then flink throw this exception.
    #### 3. how can we solve this problem?
    When your job makes the same exception, please make sure your kyuubi conf 
`kyuubi.session.engine.flink.max.rows`'s value can it meet your streaming query 
needs? Then changes the conf's value.
    
    close #5531
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5549 from davidyuan1223/fix_flink_test_bug.
    
    Closes #5531
    
    ce7fd7961 [david yuan] Update 
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
    dc3a4b9ba [davidyuan] fix flink on yarn test bug
    86a647ad9 [davidyuan] fix flink on yarn test bug
    cbd4c0c3d [davidyuan] fix flink on yarn test bug
    8b51840bc [davidyuan] add common method to get session level config
    bcb0cf372 [davidyuan] Merge remote-tracking branch 'origin/master'
    72e7aea3c [david yuan] Merge branch 'apache:master' into master
    57ec746e9 [david yuan] Merge pull request #13 from davidyuan1223/fix
    56b91a321 [yuanfuyuan] fix_4186
    c8eb9a2c7 [david yuan] Merge branch 'apache:master' into master
    2beccb6ca [david yuan] Merge branch 'apache:master' into master
    0925a4b6f [david yuan] Merge pull request #12 from 
davidyuan1223/revert-11-fix_4186
    40e80d9a8 [david yuan] Revert "fix_4186"
    c83836b43 [david yuan] Merge pull request #11 from davidyuan1223/fix_4186
    360d183b0 [david yuan] Merge branch 'master' into fix_4186
    b61604442 [yuanfuyuan] fix_4186
    e244029b8 [david yuan] Merge branch 'apache:master' into master
    bfa6cbf97 [davidyuan1223] Merge branch 'apache:master' into master
    16237c2a9 [davidyuan1223] Merge branch 'apache:master' into master
    c48ad38c7 [yuanfuyuan] remove the used blank lines
    55a0a43c5 [xiaoyuandajian] Merge pull request #10 from 
xiaoyuandajian/fix-#4057
    cb1193576 [yuan] Merge remote-tracking branch 'origin/fix-#4057' into 
fix-#4057
    86e4e1ce0 [yuan] fix-#4057 info: modify the shellcheck errors file in ./bin 
1. "$@" is a array, we want use string to compare. so update "$@" => "$*" 2. 
`tty` mean execute the command, we can use $(tty) replace it 3. param $# is a 
number, compare number should use -gt/-lt,not >/< 4. not sure the /bin/kyuubi 
line 63 'exit -1' need modify? so the directory bin only have a shellcheck note 
in /bin/kyuubi
    dd39efdeb [袁福元] fix-#4057 info: 1. "$@" is a array, we want use string to 
compare. so update "$@" => "$*" 2. `tty` mean execute the command, we can use 
$(tty) replace it 3. param $# is a number, compare number should use 
-gt/-lt,not >/<
    
    Lead-authored-by: davidyuan <[email protected]>
    Co-authored-by: david yuan <[email protected]>
    Co-authored-by: yuanfuyuan <[email protected]>
    Co-authored-by: yuan <[email protected]>
    Co-authored-by: davidyuan1223 
<[email protected]>
    Co-authored-by: xiaoyuandajian 
<[email protected]>
    Co-authored-by: 袁福元 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 26f614a504584e18cc8493cb213f7ec5ec9c2ea6)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 8e7c35a95..9469cf286 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -637,7 +637,9 @@ abstract class FlinkOperationSuite extends 
HiveJDBCTestHelper with WithFlinkTest
 
   test("execute statement - show/stop jobs") {
     if (FLINK_RUNTIME_VERSION >= "1.17") {
-      withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
+      // use a bigger value to ensure all tasks of the streaming query run 
until
+      // we explicitly stop the job.
+      withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10000"))(Map.empty) {
         withMultipleConnectionJdbcStatement()({ statement =>
           statement.executeQuery(
             "create table tbl_a (a int) with (" +

Reply via email to