lkmxsxd opened a new issue, #72:
URL: https://github.com/apache/rocketmq-eventbridge/issues/72

   ## Background&Motivation
   
   ### What needs to be done
   
   * Concurrent push
   * Each topic is allocated a cache queue, and messages can be pushed randomly 
from a topic queue or queried to push 
messages.`ConcurrentHashMap<String,BlockingQueue<ConnectRecord>>`
   * Set the maximum execution time for tasks, and sinkTask beyond the allowed 
execution time should yield CPU time. At the same time, interrupt tasks if 
necessary. Put the ConnectRecord into a low-priority queue and consider 
discarding it after reaching the retry threshold. The consumption of 
consumption sites can be processed at this time.
   
   
   
   ### Why do this
   
   1. Improving the push ability of messages.
   2. Prevent some tasks from starving, because of other task need to due the 
data flow too large.
   3. SinkTask  consume ability is insufficient.
   
   
   
   ### What benefits are bred
   
   1. Improved message distribution capabilities
   2. Message distribution per topic is not affected by traffic
   3. Avoid blockage caused by slow sinkTask processing messages
   
   
   
   ### How to achieve it
   
   1. Adding a thread pool to `EventTargetPusher` to execute sinkTask can refer 
to Connect's `WorkerTask'`implementation
   2. Redefine the data structures for 
``eventRecord`,`targetQueue`,`targetQueue`in` CirculatorContext`,Consider 
using`ConcurrentHashMap<String,BlockingQueue<ConnectRecord>>`
   3. Implement polling for topic message fetching and checking of task 
execution status in the 
`org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher#run` 
method.
      1. When there are no messages in the high-priority queue, get the message 
push from the low-priority queue
   
   ```java
   // cache the committed to the thread pool task and check run state and 
interrupt it.
   private final Map<WorkerTask, TaskFuture> taskToFutureMap = new 
ConcurrentHashMap<>();
   public void run() {
           while (!stopped) {
             // 任务执行状态检查
               checkSubmittedTasks();
               ConnectRecord targetRecord = circulatorContext.takeTargetMap();
               if (Objects.isNull(targetRecord)) {
                   targetRecord = circulatorContext.takeLowTargetMap();
               }
   
               if (Objects.isNull(targetRecord)) {
                   logger.info("current target pusher is empty");
                   this.waitForRunning(1000);
                   continue;
               }
               if (logger.isDebugEnabled()) {
                   logger.debug("start push content by pusher - {}", 
JSON.toJSONString(targetRecord));
               }
   
               Map<String, SinkTask> latestTaskMap = 
circulatorContext.getPusherTaskMap();
               String runnerName = 
targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
               SinkTask sinkTask = latestTaskMap.get(runnerName);
   
               WorkerSinkTask workerSinkTask = new WorkerSinkTask(sinkTask, 
targetRecord);
               //
               Future future = executorService.submit(workerSinkTask);
               TaskFuture taskFuture = new TaskFuture(future, 
System.currentTimeMillis());
               taskToFutureMap.put(workerSinkTask, taskFuture);
           }
       }
   ```
   
   #### checkSubmittedTasks
   
   ```java
       private void checkSubmittedTasks() {
           for (Map.Entry<WorkerTask, TaskFuture> entry : 
taskToFutureMap.entrySet()) {
               WorkerTask workerTask = entry.getKey();
               TaskFuture taskFuture = entry.getValue();
               Long currentTimeMillis = System.currentTimeMillis();
               TaskState state = workerTask.getTaskState();
               Future future = taskFuture.getFuture();
               // exited normally
               switch (state) {
                   case NEW:
                       // 任务提交但未被执行
                       break;
                   case STARTED:
                       if (currentTimeMillis - workerTask.getStartTimestamp() > 
circulatorContext.getMaxExecuteTimeoutMills()) {
                           workerTask.transitionTo(TaskState.TIMEOUT);
                                // 如果执行超时,中断此任务
                           future.cancel(true);
                           taskToFutureMap.remove(workerTask);
                           // 将ConnectRecord放入低优先级队列
                         
circulatorContext.offerLowTargetTaskQueue(workerTask.getConnectRecord());
                       }
                       break;
   
                   case ERROR:
                       taskFuture.getFuture().cancel(true);
                       taskToFutureMap.remove(workerTask);
                       // put to lower queue
                       
circulatorContext.offerLowTargetTaskQueue(workerTask.getConnectRecord());
                       break;
                   case TIMEOUT:
                       taskToFutureMap.remove(workerTask);
                       // TODO 可以考虑丢弃,或者进入死信,或者持久化到DB用于审计或者运维手动推送
                       break;
                   case COMPLETED:
                       taskToFutureMap.remove(workerTask);
                       break;
                   default:
                       logger.error("[BUG] Illegal State in when checking 
stopping tasks, {} is in {} state",
                               workerTask, state);
               }
           }
       }
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to