senlizishi opened a new issue, #2576:
URL: https://github.com/apache/incubator-streampark/issues/2576

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-streampark/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### Java Version
   
   _No response_
   
   ### Scala Version
   
   2.12.x
   
   ### StreamPark Version
   
   2.0.0
   
   ### Flink Version
   
   1.16.1
   
   ### deploy mode
   
   None
   
   ### What happened
   
   Deploying FlinkSQL jobs in the K8S Application Mode mode sometimes continues 
in the `INITIALIZING` state, but in fact it is already in the `RUNNING` state 
on the K8S. I checked the network and K8S cluster resources and they all look 
fine.
   
   Later, I debugged the code and found that a `cache`is maintained in the 
`JobStatusCache`class for job status caching, but it **does not set a cache 
timeout**.
   
   Next, `FlinkJobStatusWatcher `will continuously obtain job status and push 
status update events through `eventbus`.
   
   As shown in the following code:
   ```scala
     if (latest == null || latest.jobState != jobState.jobState || latest.jobId 
!= jobState.jobId) {
       // put job status to cache
       watchController.jobStatuses.put(trackId, jobState)
       // set jobId to trackIds
       watchController.trackIds.update(trackId)
       eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
     }
   ```
   
   When the job starts normally, the `cache`will be updated to `RUNNING`, and 
the monitoring will continue to obtain the `RUNNING `status. In the case that 
the cache will not be invalidated, if other methods directly update the 
database state to the `INITIALIZING` state without clearing the cache, then the 
job will continue to be in the `INITIALIZING` state and will not automatically 
change to the `RUNNING `state.
   
   
   My modification suggestion: Modify the `cache` timeout in `JobStatusCache` 
to 30 seconds.
   ```scala
    private[this] lazy val cache: Cache[CacheKey, JobStatusCV] = 
Caffeine.newBuilder.expireAfterWrite(30,TimeUnit.SECONDS).build()
   ```
   
   ### Error Exception
   
   _No response_
   
   ### Screenshots
   
   
   
![image](https://user-images.githubusercontent.com/24784791/230321410-a7c3bc8b-aed2-4326-8a13-7d5ffe69cfb8.png)
   
   
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!(您是否要贡献这个PR?)
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to