[ 
https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-15669:
-------------------------------
    Description: 
in sql client, CLI client do cancel query operation through {{void 
cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. 
However, the {{resultId}} is a random UUID, is not the job id. So CLI client 
can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
private <C> ResultDescriptor executeQueryInternal(String sessionId, 
ExecutionContext<C> context, String query) {
         ......

        // store the result with a unique id
        final String resultId = UUID.randomUUID().toString();
        resultStore.storeResult(resultId, result);

        ......

        // create execution
        final ProgramDeployer deployer = new ProgramDeployer(
                configuration, jobName, pipeline);

        // start result retrieval
        result.startRetrieval(deployer);

        return new ResultDescriptor(
                        resultId,
                        removeTimeAttributes(table.getSchema()),
                        result.isMaterialized());
}



private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
resultId) {
        ......

        // stop Flink job
        try (final ClusterDescriptor<T> clusterDescriptor = 
context.createClusterDescriptor()) {
                ClusterClient<T> clusterClient = null;
                try {
                        // retrieve existing cluster
                        clusterClient = 
clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
                        try {
                                // ======== cancel job through resultId =======
                                clusterClient.cancel(new 
JobID(StringUtils.hexStringToByte(resultId))).get();
                        } catch (Throwable t) {
                                // the job might has finished earlier
                        }
                } catch (Exception e) {
                        throw new SqlExecutionException("Could not retrieve or 
create a cluster.", e);
                } finally {
                        try {
                                if (clusterClient != null) {
                                        clusterClient.close();
                                }
                        } catch (Exception e) {
                                // ignore
                        }
                }
        } catch (SqlExecutionException e) {
                throw e;
        } catch (Exception e) {
                throw new SqlExecutionException("Could not locate a cluster.", 
e);
        }
}
{code}





  was:
in sql client, CLI client do cancel query through {{void cancelQuery(String 
sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} 
is a random UUID, is not the job id. So CLI client can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
private <C> ResultDescriptor executeQueryInternal(String sessionId, 
ExecutionContext<C> context, String query) {
         ......

        // store the result with a unique id
        final String resultId = UUID.randomUUID().toString();
        resultStore.storeResult(resultId, result);

        ......

        // create execution
        final ProgramDeployer deployer = new ProgramDeployer(
                configuration, jobName, pipeline);

        // start result retrieval
        result.startRetrieval(deployer);

        return new ResultDescriptor(
                        resultId,
                        removeTimeAttributes(table.getSchema()),
                        result.isMaterialized());
}



private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
resultId) {
        ......

        // stop Flink job
        try (final ClusterDescriptor<T> clusterDescriptor = 
context.createClusterDescriptor()) {
                ClusterClient<T> clusterClient = null;
                try {
                        // retrieve existing cluster
                        clusterClient = 
clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
                        try {
                                // ======== cancel job through resultId =======
                                clusterClient.cancel(new 
JobID(StringUtils.hexStringToByte(resultId))).get();
                        } catch (Throwable t) {
                                // the job might has finished earlier
                        }
                } catch (Exception e) {
                        throw new SqlExecutionException("Could not retrieve or 
create a cluster.", e);
                } finally {
                        try {
                                if (clusterClient != null) {
                                        clusterClient.close();
                                }
                        } catch (Exception e) {
                                // ignore
                        }
                }
        } catch (SqlExecutionException e) {
                throw e;
        } catch (Exception e) {
                throw new SqlExecutionException("Could not locate a cluster.", 
e);
        }
}
{code}






> SQL client can't cancel flink job
> ---------------------------------
>
>                 Key: FLINK-15669
>                 URL: https://issues.apache.org/jira/browse/FLINK-15669
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: godfrey he
>            Priority: Major
>             Fix For: 1.10.0
>
>
> in sql client, CLI client do cancel query operation through {{void 
> cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. 
> However, the {{resultId}} is a random UUID, is not the job id. So CLI client 
> can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> private <C> ResultDescriptor executeQueryInternal(String sessionId, 
> ExecutionContext<C> context, String query) {
>        ......
>       // store the result with a unique id
>       final String resultId = UUID.randomUUID().toString();
>       resultStore.storeResult(resultId, result);
>       ......
>       // create execution
>       final ProgramDeployer deployer = new ProgramDeployer(
>               configuration, jobName, pipeline);
>       // start result retrieval
>       result.startRetrieval(deployer);
>       return new ResultDescriptor(
>                       resultId,
>                       removeTimeAttributes(table.getSchema()),
>                       result.isMaterialized());
> }
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
> resultId) {
>       ......
>       // stop Flink job
>       try (final ClusterDescriptor<T> clusterDescriptor = 
> context.createClusterDescriptor()) {
>               ClusterClient<T> clusterClient = null;
>               try {
>                       // retrieve existing cluster
>                       clusterClient = 
> clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
>                       try {
>                               // ======== cancel job through resultId =======
>                               clusterClient.cancel(new 
> JobID(StringUtils.hexStringToByte(resultId))).get();
>                       } catch (Throwable t) {
>                               // the job might has finished earlier
>                       }
>               } catch (Exception e) {
>                       throw new SqlExecutionException("Could not retrieve or 
> create a cluster.", e);
>               } finally {
>                       try {
>                               if (clusterClient != null) {
>                                       clusterClient.close();
>                               }
>                       } catch (Exception e) {
>                               // ignore
>                       }
>               }
>       } catch (SqlExecutionException e) {
>               throw e;
>       } catch (Exception e) {
>               throw new SqlExecutionException("Could not locate a cluster.", 
> e);
>       }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to