[ 
https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-15669:
--------------------------------------

I partially reverted these changes:
Only the tests are reverted because the fix is still good.  The added
tests don't work because of a race condition: if the query finishes
before the test cancels them the job status will be FINISHED, and not
CANCELLED, as the text expects.

revert on release-1.10: 4518d18a726b35de9ff802d155fd8100dc711a63
revert on master: cbaea2924a90a1141d9794f6cd8f561e77ddc11a

> SQL client can't cancel flink job
> ---------------------------------
>
>                 Key: FLINK-15669
>                 URL: https://issues.apache.org/jira/browse/FLINK-15669
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.10.1, 1.11.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> in sql client, CLI client do cancel query operation through {{void 
> cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. 
> However, the {{resultId}} is a random UUID, is not the job id. So CLI client 
> can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> private <C> ResultDescriptor executeQueryInternal(String sessionId, 
> ExecutionContext<C> context, String query) {
>        ......
>       // store the result with a unique id
>       final String resultId = UUID.randomUUID().toString();
>       resultStore.storeResult(resultId, result);
>       ......
>       // create execution
>       final ProgramDeployer deployer = new ProgramDeployer(
>               configuration, jobName, pipeline);
>       // start result retrieval
>       result.startRetrieval(deployer);
>       return new ResultDescriptor(
>                       resultId,
>                       removeTimeAttributes(table.getSchema()),
>                       result.isMaterialized());
> }
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
> resultId) {
>       ......
>       // stop Flink job
>       try (final ClusterDescriptor<T> clusterDescriptor = 
> context.createClusterDescriptor()) {
>               ClusterClient<T> clusterClient = null;
>               try {
>                       // retrieve existing cluster
>                       clusterClient = 
> clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
>                       try {
>                               // ======== cancel job through resultId =======
>                               clusterClient.cancel(new 
> JobID(StringUtils.hexStringToByte(resultId))).get();
>                       } catch (Throwable t) {
>                               // the job might has finished earlier
>                       }
>               } catch (Exception e) {
>                       throw new SqlExecutionException("Could not retrieve or 
> create a cluster.", e);
>               } finally {
>                       try {
>                               if (clusterClient != null) {
>                                       clusterClient.close();
>                               }
>                       } catch (Exception e) {
>                               // ignore
>                       }
>               }
>       } catch (SqlExecutionException e) {
>               throw e;
>       } catch (Exception e) {
>               throw new SqlExecutionException("Could not locate a cluster.", 
> e);
>       }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to