healchow opened a new pull request, #12074:
URL: https://github.com/apache/inlong/pull/12074
[INLONG-12073][Manager] Optimize Pulsar message query with thread pool and
proper error handling
Fixes #12073
### Motivation
Currently, when querying MQ messages from multiple Pulsar clusters, there
are several issues:
1. **No thread pool management**: Message queries are executed without
proper thread pool management, which may lead to resource exhaustion under high
concurrency.
2. **No graceful handling for task rejection**: When too many concurrent
requests come in, the system doesn't properly handle the
`RejectedExecutionException` and doesn't provide a user-friendly error response.
3. **No task cancellation mechanism**: When task submission fails,
previously submitted tasks continue to run unnecessarily.
4. **No interruption support**: Long-running IO operations cannot be
cancelled when the request is aborted.
### Modifications
1. **Add a dedicated thread pool**: Use `ThreadPoolTaskExecutor` with
configurable core/max pool size and queue capacity for message query tasks.
2. **Add proper error handling**:
- Add a new error code `PULSAR_QUERY_REJECTED(2610)` for task rejection
scenarios
- Return proper error code and message to the caller: `{"success": false,
"errCode": 2610, "errMsg": "Query task rejected: too many concurrent requests,
please try again later"}`
3. **Implement task cancellation**: When `RejectedExecutionException`
occurs, cancel all previously submitted tasks to free up resources.
4. **Add interruption checks**: Check
`Thread.currentThread().isInterrupted()` before and after IO operations to
support task cancellation.
### Verifying this change
- [ ] This change is a trivial rework/code cleanup without any test coverage.
- [ ] This change is already covered by existing tests, such as:
*(please describe tests)*
- [x] This change added tests and can be verified as follows:
- *Added unit tests QueryLatestMessagesRunnableTest to verified this
change*
### Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs
/ not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a follow-up
issue for adding the documentation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]