wangpeix opened a new issue, #7752:
URL: https://github.com/apache/inlong/issues/7752
### What happened
When remove the file data source on databoard. Then add one file data source
in new inlong group on databoard. Agent occur the following exception:
```java
023-03-31 22:06:50.408 -ERROR [ task-job_11_2-6]
o.a.i.a.c.t.TaskWrapper :211 error while running wrapper
java.util.concurrent.RejectedExecutionException: Task
org.apache.inlong.agent.plugin.sinks.PulsarSink$$Lambda$208/701747533@5f91f266
rejected from java.util.concurrent.ThreadPoolExecutor@43d6a181[Shutting down,
pool size = 6, active threads = 6, queued tasks = 0, completed tasks = 2]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
~[?:1.8.0_151]
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
~[?:1.8.0_151]
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
~[?:1.8.0_151]
at
org.apache.inlong.agent.plugin.sinks.PulsarSink.init(PulsarSink.java:153)
~[agent-plugins-1.7.0-SNAPSHOT.jar:1.7.0-SNAPSHOT]
at org.apache.inlong.agent.core.task.Task.init(Task.java:76)
~[agent-core-1.7.0-SNAPSHOT.jar:1.7.0-SNAPSHOT]
at
org.apache.inlong.agent.core.task.TaskWrapper.run(TaskWrapper.java:203)
~[agent-core-1.7.0-SNAPSHOT.jar:1.7.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_151]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_151]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_151]
```
### What you expected to happen
Agent `PulsarSink` thread pool is global, like this:
```java
public class PulsarSink extends AbstractSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerFactory.class);
private static final AtomicInteger CLIENT_INDEX = new AtomicInteger(0);
private static final ExecutorService EXECUTOR_SERVICE = new
ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new
AgentThreadFactory("PulsarSink"));
```
When `PulsarSink#destroy`, the thread pool will be shut down.
```java
public void destroy() {
...
EXECUTOR_SERVICE.shutdown();
...
}
```
Then `PulsarSink#init` submit thread to the `shutdown thread pool` may
cause the problem.
```java
public void init(JobProfile jobConf) {
...
EXECUTOR_SERVICE.execute(sendDataThread());
...
}
```
### How to reproduce
Remove the file data source on databoard. Then add one file data source in
new inlong group on databoard. Agent occur the exception.
### Environment
_No response_
### InLong version
master
### InLong Component
InLong Agent
### Are you willing to submit PR?
- [ ] Yes, I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]