[ 
https://issues.apache.org/jira/browse/FLINK-24214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411981#comment-17411981
 ] 

Timo Walther commented on FLINK-24214:
--------------------------------------

Seems the latest refactorings of the SQL Client in 1.13 messed up the exception 
design. Everything executed from within the `Executor` class in the SQL client 
should be wrapped into a `SqlExecutionException`. If not, the exception will 
bubble up, crash the client and cause this message.

> A submit job failure crashes the sql client
> -------------------------------------------
>
>                 Key: FLINK-24214
>                 URL: https://issues.apache.org/jira/browse/FLINK-24214
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.13.2
>         Environment: Flink 1.13.2
> Ubuntu 21.04
> Java 8
>            Reporter: Francesco Guardiani
>            Priority: Not a Priority
>
> I've noticed that when executing a valid query, in case there is a "bad" 
> error when submitting it to the flink cluster, the client is going to crash, 
> with a misleading beginning of the stacktrace. For example:
> {code:java}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>       at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: java.lang.RuntimeException: Error running SQL job.
>       at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:606)
>       at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>       at 
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
>       at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
>       at java.util.Optional.ifPresent(Optional.java:159)
>       at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>       at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>       at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>       at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:603)
>       ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>       ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>       ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
> initialize task 'Source: KafkaTableSource(o_orderkey, o_custkey, 
> o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, 
> o_clerk, o_shippriority, o_comment) -> 
> SourceConversion(table=[default_catalog.default_database.prod_orders, source: 
> [KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
> o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, 
> o_comment)]], fields=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
> o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, 
> o_comment]) -> Timestamps/Watermarks -> SinkConversionToRow -> Map -> Sink: 
> CsvTableSink(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, 
> o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment)': File or 
> directory already exists. Existing files and directories are not overwritten 
> in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and 
> directories.
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
>       ... 10 more
> Caused by: java.io.IOException: File or directory already exists. Existing 
> files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE 
> mode to overwrite existing files and directories.
>       at 
> org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:940)
>       at 
> org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:100)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212)
>       ... 20 more
> End of exception on server side>]
>       at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>       at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>       ... 4 more
> Shutting down the session...
> done.
> {code}
> Or another example is in this issue: 
> https://issues.apache.org/jira/browse/FLINK-22188
> Shouldn't we catch these exceptions and nicely report them, without crashing 
> the client? For example, the exception could be written to stderr and to a 
> log file, without crashing the client, thus waiting for the next input query.
> Also the initial lines of the stacktrace are misleading:
> {code}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
> {code}
> While it's clearly a misconfiguration from my side of the connector.
> What do you think about it? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to