[ 
https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-15669:
-------------------------------
    Description: 
in sql client, CLI client do cancel query through {{void cancelQuery(String 
sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} 
is a random UUID, is not the job id. So CLI client can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
        private <C> ResultDescriptor executeQueryInternal(String sessionId, 
ExecutionContext<C> context, String query) {
                ......

                // store the result with a unique id
                final String resultId = UUID.randomUUID().toString();
                resultStore.storeResult(resultId, result);

               ......

                // create execution
                final ProgramDeployer deployer = new ProgramDeployer(
                                configuration, jobName, pipeline);

                // start result retrieval
                result.startRetrieval(deployer);

                return new ResultDescriptor(
                                resultId,
                                removeTimeAttributes(table.getSchema()),
                                result.isMaterialized());
        }



private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
resultId) {
                ......

                // stop Flink job
                try (final ClusterDescriptor<T> clusterDescriptor = 
context.createClusterDescriptor()) {
                        ClusterClient<T> clusterClient = null;
                        try {
                                // retrieve existing cluster
                                clusterClient = 
clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
                                try {
                                        clusterClient.cancel(new 
JobID(StringUtils.hexStringToByte(resultId))).get();
                                } catch (Throwable t) {
                                        // the job might has finished earlier
                                }
                        } catch (Exception e) {
                                throw new SqlExecutionException("Could not 
retrieve or create a cluster.", e);
                        } finally {
                                try {
                                        if (clusterClient != null) {
                                                clusterClient.close();
                                        }
                                } catch (Exception e) {
                                        // ignore
                                }
                        }
                } catch (SqlExecutionException e) {
                        throw e;
                } catch (Exception e) {
                        throw new SqlExecutionException("Could not locate a 
cluster.", e);
                }
        }
{code}





  was:
in sql client, CLI client do cancel query through {{void cancelQuery(String 
sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} 
is a random UUID, is not the job id. So CLI client can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
        private <C> ResultDescriptor executeQueryInternal(String sessionId, 
ExecutionContext<C> context, String query) {
                ......

                // store the result with a unique id
                final String resultId = UUID.randomUUID().toString();
                resultStore.storeResult(resultId, result);

               ......

                // create execution
                final ProgramDeployer deployer = new ProgramDeployer(
                                configuration, jobName, pipeline);

                // start result retrieval
                result.startRetrieval(deployer);

                return new ResultDescriptor(
                                resultId,
                                removeTimeAttributes(table.getSchema()),
                                result.isMaterialized());
        }

private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
resultId) {
                ......

                // stop Flink job
                try (final ClusterDescriptor<T> clusterDescriptor = 
context.createClusterDescriptor()) {
                        ClusterClient<T> clusterClient = null;
                        try {
                                // retrieve existing cluster
                                clusterClient = 
clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
                                try {
                                        clusterClient.cancel(new 
JobID(StringUtils.hexStringToByte(resultId))).get();
                                } catch (Throwable t) {
                                        // the job might has finished earlier
                                }
                        } catch (Exception e) {
                                throw new SqlExecutionException("Could not 
retrieve or create a cluster.", e);
                        } finally {
                                try {
                                        if (clusterClient != null) {
                                                clusterClient.close();
                                        }
                                } catch (Exception e) {
                                        // ignore
                                }
                        }
                } catch (SqlExecutionException e) {
                        throw e;
                } catch (Exception e) {
                        throw new SqlExecutionException("Could not locate a 
cluster.", e);
                }
        }
{code}






> SQL client can't cancel flink job
> ---------------------------------
>
>                 Key: FLINK-15669
>                 URL: https://issues.apache.org/jira/browse/FLINK-15669
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: godfrey he
>            Priority: Major
>             Fix For: 1.10.0
>
>
> in sql client, CLI client do cancel query through {{void cancelQuery(String 
> sessionId, String resultId)}} method in {{Executor}}. However, the 
> {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel 
> a running job.
> related code in {{LocalExecutor}}:
> {code:java}
>       private <C> ResultDescriptor executeQueryInternal(String sessionId, 
> ExecutionContext<C> context, String query) {
>               ......
>               // store the result with a unique id
>               final String resultId = UUID.randomUUID().toString();
>               resultStore.storeResult(resultId, result);
>              ......
>               // create execution
>               final ProgramDeployer deployer = new ProgramDeployer(
>                               configuration, jobName, pipeline);
>               // start result retrieval
>               result.startRetrieval(deployer);
>               return new ResultDescriptor(
>                               resultId,
>                               removeTimeAttributes(table.getSchema()),
>                               result.isMaterialized());
>       }
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
> resultId) {
>               ......
>               // stop Flink job
>               try (final ClusterDescriptor<T> clusterDescriptor = 
> context.createClusterDescriptor()) {
>                       ClusterClient<T> clusterClient = null;
>                       try {
>                               // retrieve existing cluster
>                               clusterClient = 
> clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
>                               try {
>                                       clusterClient.cancel(new 
> JobID(StringUtils.hexStringToByte(resultId))).get();
>                               } catch (Throwable t) {
>                                       // the job might has finished earlier
>                               }
>                       } catch (Exception e) {
>                               throw new SqlExecutionException("Could not 
> retrieve or create a cluster.", e);
>                       } finally {
>                               try {
>                                       if (clusterClient != null) {
>                                               clusterClient.close();
>                                       }
>                               } catch (Exception e) {
>                                       // ignore
>                               }
>                       }
>               } catch (SqlExecutionException e) {
>                       throw e;
>               } catch (Exception e) {
>                       throw new SqlExecutionException("Could not locate a 
> cluster.", e);
>               }
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to