[ 
https://issues.apache.org/jira/browse/FLINK-19049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17185024#comment-17185024
 ] 

Nicholas Jiang commented on FLINK-19049:
----------------------------------------

[~rmetzger] The entire definition of executeSql is asynchronous execution. If 
you use execute, you can only return to TableResult (attach mode) after the job 
ends, and the getJobClient of TableResult is meaningless.

> TableEnvironmentImpl.executeInternal() does not wait for the final job status
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-19049
>                 URL: https://issues.apache.org/jira/browse/FLINK-19049
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Robert Metzger
>            Priority: Major
>
> While working on another change, I realized that the 
> {{FunctionITCase.testInvalidUseOfTableFunction()}} tests throws a 
> NullPointerException during execution.
> This error is not visible, because TableEnvironmentImpl.executeInternal() 
> does not wait for the final job status.
> It submits the job using the job client ({{JobClient jobClient = 
> execEnv.executeAsync(pipeline);}}), and it doesn't wait for the job to 
> complete before returning a result. 
> This is the null pointer that is hidden:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'insert-into_default_catalog.default_database.SinkTable'.
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823)
>       at 
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)
>       ... 34 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>       at 
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:148)
>       at 
> org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:92)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:195)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:188)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:182)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:523)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:422)
>       at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:43)
>       at 
> StreamExecCalc$245$TableFunctionResultConverterCollector$243.collect(Unknown 
> Source)
>       at 
> org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:201)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase$RowTableFunction.eval(FunctionITCase.java:1024)
>       at StreamExecCalc$245.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>       at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to