morhidi commented on PR #237:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/237#issuecomment-1137141491

   > > The detailed JOB state:
   > > ```
   > > {
   > >   "jobs": [
   > >     {
   > >       "jid": "00000000000000000000000000000000",
   > >       "name": "State machine job",
   > >       "state": "RUNNING",
   > >       "start-time": 1653471451258,
   > >       "end-time": -1,
   > >       "duration": 1666751,
   > >       "last-modification": 1653472755540,
   > >       "tasks": {
   > >         "total": 4,
   > >         "created": 0,
   > >         "scheduled": 0,
   > >         "deploying": 0,
   > >         "running": 4,
   > >         "finished": 0,
   > >         "canceling": 0,
   > >         "canceled": 0,
   > >         "failed": 0,
   > >         "reconciling": 0,
   > >         "initializing": 0
   > >       }
   > >     }
   > >   ]
   > > }
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > The Rest Client doesn't seem to map everything 
https://github.com/apache/flink/blob/ffca99d5f369c21182b729ce5abe082ffa40d579/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L611
   > > We can create our own listJobs method, and we could have an accurate 
state. @wangyang0918 WDYT?
   > 
   > @morhidi IMO, the `listJobs` interface should return the task execution 
states. With the task execution states, whether the job is actually running 
could be checked. In addtion, I'm confusing how to create own `listJobs` 
method. Do you mean that introduces `listJobsForOperator` interface in 
`ClusterClient`?
   
   No, we could do something like this in the `FlinkService`:
   
   ```
    public Collection<JobDetails> getJobDetails(Configuration conf) throws 
Exception {
           try (RestClusterClient<String> clusterClient =
                        (RestClusterClient<String>) getClusterClient(conf)) {
   
               var response =
                       clusterClient
                               .sendRequest(
                                       JobsOverviewHeaders.getInstance(),
                                       EmptyMessageParameters.getInstance(),
                                       EmptyRequestBody.getInstance())
                               .get(
                                       configManager
                                               .getOperatorConfiguration()
                                               .getFlinkClientTimeout()
                                               .toSeconds(),
                                       TimeUnit.SECONDS);
   
               return response.getJobs();
           }
       }
   ```
   
   and use it instead of the current `Collection<JobStatusMessage> 
listJobs(Configuration conf)`. There's no reason to wait for the RestClient 
changes from the Flink runtime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to