complone opened a new issue, #71:
URL: https://github.com/apache/rocketmq-eventbridge/issues/71

   document: https://shimo.im/file-invite/Ny5aM7dSkTmgr7KULSqMx47jGpYb6/ 
   
   ### motivation
   Due to the large amount of data in the current process of pushing from the 
event source to the event target, it may be necessary to design a set of logic 
for reading event source data and multi-threaded push event target logic to 
meet the concurrency under a large amount of data consumption, and 
observability during backpressure.
   
   #### Design
   
   producer-consumer approach
   
   The production end continuously obtains source data and puts it into the 
blocking queue. BlockQueue sets a length limit. If the consumer end cannot 
extract the content in time, it will block the production end and cannot 
continue to store data in it until there is free space. Judging the production 
end The end flag is, and the last acquired data is null, indicating that there 
is no new source data.
   The consumer side will extract the source data from the blockQueue for 
consumption. Of course, most of the time-consuming logic is in the specific 
processing details. It will not take too long to fetch the data itself, so the 
consumer side also adopts a single-threaded system. After fetching the source 
data , will be thrown to a thread pool executor ThreadPoolTaskExecutor, which 
will control the execution of specific tasks. ThreadPoolTaskExecutor also has a 
task queue. In order to prevent the queue from being too long and bursting the 
memory, there is an upper limit control. If it is less than the limit, the 
consumer thread will submit tasks to ThreadPoolTaskExecutor
   
   Then a task manager is needed to manage the production and consumption logic 
of EventTargetPusher. And create corresponding blocking queues for different 
kinds of tasks (the tasks here can be high, medium and low priority)
   
   
   TaskControlManager
   ```
   @Autowired
   private DataServiceDispatch dataServiceDispatch;
   public void doHandle(DataContextParam dataContextParam) {
        if (DataUtils. switchOpen == false) {
            return;
        }
        // production side
        new Thread(new Runnable() {
            public void run() {
                Object object = null;
                do {
                    if (DataUtils.consumerExecutor.getCurrentQueueSize() > 
DataUtils.consumerCurrentQueueSizeLimit) {
                        try {
                            Thread. sleep(100);
                            continue;
                        } catch (InterruptedException e) {
                        }
                    }
                    object = 
dataServiceDispatch.getDateService(dataContextParam.getBizType()).querySourceData();
                    if (object != null) {
                        try {
                            DataUtils.taskQueue.put(object);
                        } catch (Exception e) {
                            logger. error("set queue error!", e);
                        }
                    }
                    // The task ends, the start switch is turned off
                    if (object == null) {
                        
dataServiceDispatch.getDateService(dataContextParam.getBizType()).resetSign();
                        DataUtils. switchOpen = false;
                    }
                } while (object != null);
            }
        }).start();
        // Consumer side
        new Thread(new Runnable() {
            @Override
            public void run() {
                Object object = null;
                while (true) {
                    try {
                        if (DataUtils.consumerExecutor.getCurrentQueueSize() > 
DataUtils.consumerCurrentQueueSizeLimit) {
                            try {
                                Thread. sleep(100);
                                continue;
                            } catch (InterruptedException e) {
                            }
                        }
                        object = DataUtils.taskQueue.take();
                    } catch (Exception e) {
                        logger. error("take queue error!", e);
                    }
                    DataUtils.consumerExecutor.execute(new ConsumerTask(object, 
dataContextParam));
                }
            }
        }).start();
   }
   // consumer task
   private class ConsumerTask implements Runnable {
        private Object object = null;
        private DataContextParam dataContextParam = null;
        public ConsumerTask(Object object, DataContextParam dataContextParam){
            this. object = object;
            this.dataContextParam = dataContextParam;
        }
        @Override
        public void run() {
            
dataServiceDispatch.getDateService(dataContextParam.getBizType()).migrationData(object);
        }
   }
    ```
   
   Datautils
   ```
   public class DataUtils {
        // switch
        public static boolean switchOpen = false;
        // number of source data records
        public static AtomicInteger sourceDataSize = new AtomicInteger(0);
        // Number of records successfully processed
        public static AtomicInteger handleSuccessSize = new AtomicInteger(0);
        // Number of failed records processed
        public static AtomicInteger handleFailSize = new AtomicInteger(0);
        // task queue
        public static BlockingQueue<Object> taskQueue = new 
ArrayBlockingQueue<Object>(5);
        // Consumer thread executor
        public static ThreadPoolTaskExecutor consumerExecutor = null;
        // The upper limit of the queue length of the consumer thread executor
        public static int consumerCurrentQueueSizeLimit = 100;
        public static void setConsumerExecutor(ThreadPoolTaskExecutor 
consumerExecutor) {
            DataUtils.consumerExecutor = consumerExecutor;
            DataUtils.consumerExecutor.setMaxPoolSize(30);
            DataUtils.consumerExecutor.setCorePoolSize(30);
        }
        public static void setConsumerCurrentQueueSizeLimit(int 
consumerCurrentQueueSizeLimit) {
            DataUtils.consumerCurrentQueueSizeLimit = 
consumerCurrentQueueSizeLimit;
        }
        public static void setCustomerMaxThreadSize(int maxThreadSize) {
            DataUtils.consumerExecutor.setMaxPoolSize(maxThreadSize);
        }
        public static void setCustomerCorePoolSize(int coreThreadSize) {
            DataUtils.consumerExecutor.setCorePoolSize(coreThreadSize);
        }
        public static void resetRecordCount() {
            sourceDataSize = new AtomicInteger(0);
            handleSuccessSize = new AtomicInteger(0);
            handleFailSize = new AtomicInteger(0);
        }
   }
   
   ```
   
   
   Custom thread pool ThreadPoolTaskExecutor
   ```
   public class ThreadPoolTaskExecutor extends CustomizableThreadFactory 
implements ExecutorService, SchedulingTaskExecutor, Executor, BeanNameAware, 
InitializingBean, DisposableBean {
       protected final Logger           logger                           = 
LoggerFactory.getLogger(getClass());
       private final Object             poolSizeMonitor                  = new 
Object();
       private int                      corePoolSize                     = 1;
       private int                      maxPoolSize                      = 
Integer.MAX_VALUE;
       private int                      keepAliveSeconds                 = 60;
       private boolean                  allowCoreThreadTimeOut           = 
false;
       private int                      queueCapacity                    = 
Integer.MAX_VALUE;
       private ThreadFactory            threadFactory                    = this;
       private RejectedExecutionHandler rejectedExecutionHandler         = new 
ThreadPoolExecutor.CallerRunsPolicy();
       private boolean                  waitForTasksToCompleteOnShutdown = 
false;
       private boolean                  threadNamePrefixSet              = 
false;
       private String                   beanName;
       private ThreadPoolExecutor       threadPoolExecutor;
       /**
        * Set the ThreadPoolExecutor's core pool size. Default is 1.
        * <p>
        * <b>This setting can be modified at runtime, for example through 
JMX.</b>
        */
       public void setCorePoolSize(int corePoolSize) {
           synchronized (this.poolSizeMonitor) {
               this.corePoolSize = corePoolSize;
               if (this.threadPoolExecutor != null) {
                   this.threadPoolExecutor.setCorePoolSize(corePoolSize);
               }
           }
       }
       /**
        * Return the ThreadPoolExecutor's core pool size.
        */
       public int getCorePoolSize() {
           synchronized (this.poolSizeMonitor) {
               return this.corePoolSize;
           }
       }
       /**
        * Set the ThreadPoolExecutor's maximum pool size. Default is 
<code>Integer.MAX_VALUE</code>.
        * <p>
        * <b>This setting can be modified at runtime, for example through 
JMX.</b>
        */
       public void setMaxPoolSize(int maxPoolSize) {
           synchronized (this.poolSizeMonitor) {
               this.maxPoolSize = maxPoolSize;
               if (this.threadPoolExecutor != null) {
                   this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
               }
           }
       }
       /**
        * Return the ThreadPoolExecutor's maximum pool size.
        */
       public int getMaxPoolSize() {
           synchronized (this.poolSizeMonitor) {
               return this.maxPoolSize;
           }
       }
       /**
        * Set the ThreadPoolExecutor's keep-alive seconds. Default is 60.
        * <p>
        * <b>This setting can be modified at runtime, for example through 
JMX.</b>
        */
       public void setKeepAliveSeconds(int keepAliveSeconds) {
           synchronized (this.poolSizeMonitor) {
               this.keepAliveSeconds = keepAliveSeconds;
               if (this.threadPoolExecutor != null) {
                   this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, 
TimeUnit.SECONDS);
               }
           }
       }
       /**
        * Return the ThreadPoolExecutor's keep-alive seconds.
        */
       public int getKeepAliveSeconds() {
           synchronized (this.poolSizeMonitor) {
               return this.keepAliveSeconds;
           }
       }
       /**
        * Specify whether to allow core threads to time out. This enables 
dynamic growing and shrinking even in combination
        * with a non-zero queue (since the max pool size will only grow once 
the queue is full).
        * <p>
        * Default is "false". Note that this feature is only available on Java 
6 or above. On Java 5, consider switching to
        * the backport-concurrent version of ThreadPoolTaskExecutor which also 
supports this feature.
        * 
        * @see 
java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
        */
       public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
           this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
       }
       /**
        * Set the capacity for the ThreadPoolExecutor's BlockingQueue. Default 
is <code>Integer.MAX_VALUE</code>.
        * <p>
        * Any positive value will lead to a LinkedBlockingQueue instance; any 
other value will lead to a SynchronousQueue
        * instance.
        * 
        * @see java.util.concurrent.LinkedBlockingQueue
        * @see java.util.concurrent.SynchronousQueue
        */
       public void setQueueCapacity(int queueCapacity) {
           this.queueCapacity = queueCapacity;
       }
       /**
        * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
        * <p>
        * Default is this executor itself (i.e. the factory that this executor 
inherits from). See
        * {@link org.springframework.util.CustomizableThreadCreator}'s javadoc 
for available bean properties.
        * 
        * @see #setThreadPriority
        * @see #setDaemon
        */
       public void setThreadFactory(ThreadFactory threadFactory) {
           this.threadFactory = (threadFactory != null ? threadFactory : this);
       }
       /**
        * Set the RejectedExecutionHandler to use for the ThreadPoolExecutor. 
Default is the ThreadPoolExecutor's default
        * abort policy.
        * 
        * @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        */
       public void setRejectedExecutionHandler(RejectedExecutionHandler 
rejectedExecutionHandler) {
           this.rejectedExecutionHandler = (rejectedExecutionHandler != null ? 
rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
       }
       /**
        * Set whether to wait for scheduled tasks to complete on shutdown.
        * <p>
        * Default is "false". Switch this to "true" if you prefer fully 
completed tasks at the expense of a longer shutdown
        * phase.
        * 
        * @see java.util.concurrent.ThreadPoolExecutor#shutdown()
        * @see java.util.concurrent.ThreadPoolExecutor#shutdownNow()
        */
       public void setWaitForTasksToCompleteOnShutdown(boolean 
waitForJobsToCompleteOnShutdown) {
           this.waitForTasksToCompleteOnShutdown = 
waitForJobsToCompleteOnShutdown;
       }
       @Override
       public void setThreadNamePrefix(String threadNamePrefix) {
           super.setThreadNamePrefix(threadNamePrefix);
           this.threadNamePrefixSet = true;
       }
       @Override
       public void setBeanName(String name) {
           this.beanName = name;
       }
       /**
        * Calls <code>initialize()</code> after the container applied all 
property values.
        * 
        * @see #initialize()
        */
       @Override
       public void afterPropertiesSet() {
           initialize();
       }
       /**
        * Creates the BlockingQueue and the ThreadPoolExecutor.
        * 
        * @see #createQueue
        */
       @SuppressWarnings("unchecked")
       public void initialize() {
           if (logger.isInfoEnabled()) {
               logger.info("Initializing ThreadPoolExecutor" + (this.beanName 
!= null ? " '" + this.beanName + "'" : ""));
           }
           if (!this.threadNamePrefixSet && this.beanName != null) {
               setThreadNamePrefix(this.beanName + "-");
           }
           BlockingQueue queue = createQueue(this.queueCapacity);
           this.threadPoolExecutor = new ThreadPoolExecutor(this.corePoolSize, 
this.maxPoolSize, this.keepAliveSeconds,
                                                            TimeUnit.SECONDS, 
queue, this.threadFactory,
                                                            
this.rejectedExecutionHandler);
           if (this.allowCoreThreadTimeOut) {
               this.threadPoolExecutor.allowCoreThreadTimeOut(true);
           }
       }
       /**
        * Create the BlockingQueue to use for the ThreadPoolExecutor.
        * <p>
        * A LinkedBlockingQueue instance will be created for a positive 
capacity value; a SynchronousQueue else.
        * 
        * @param queueCapacity the specified queue capacity
        * @return the BlockingQueue instance
        * @see java.util.concurrent.LinkedBlockingQueue
        * @see java.util.concurrent.SynchronousQueue
        */
       @SuppressWarnings("unchecked")
       protected BlockingQueue createQueue(int queueCapacity) {
           if (queueCapacity > 0) {
               return new LinkedBlockingQueue(queueCapacity);
           } else {
               return new SynchronousQueue();
           }
       }
       /**
        * Return the underlying ThreadPoolExecutor for native access.
        * 
        * @return the underlying ThreadPoolExecutor (never <code>null</code>)
        * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't 
been initialized yet
        */
       public ThreadPoolExecutor getThreadPoolExecutor() throws 
IllegalStateException {
           Assert.state(this.threadPoolExecutor != null, 
"ThreadPoolTaskExecutor not initialized");
           return this.threadPoolExecutor;
       }
       /**
        * Implementation of both the JDK 1.5 Executor interface and the Spring 
TaskExecutor interface, delegating to the
        * ThreadPoolExecutor instance.
        * 
        * @see java.util.concurrent.Executor#execute(Runnable)
        * @see org.springframework.core.task.TaskExecutor#execute(Runnable)
        */
       @Override
       public void execute(Runnable task) {
           Executor executor = getThreadPoolExecutor();
           try {
               executor.execute(task);
           } catch (RejectedExecutionException ex) {
               throw new TaskRejectedException("Executor [" + executor + "] did 
not accept task: " + task, ex);
           }
       }
       /**
        * This task executor prefers short-lived work units.
        */
       @Override
       public boolean prefersShortLivedTasks() {
           return true;
       }
       /**
        * Return the current pool size.
        * 
        * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
        */
       public int getPoolSize() {
           return getThreadPoolExecutor().getPoolSize();
       }
       /**
        * Return the number of currently active threads.
        * 
        * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
        */
       public int getActiveCount() {
           return getThreadPoolExecutor().getActiveCount();
       }
       /**
        * Return this ThreadPool queue capacity
        */
       public int getQueueSize() {
           return queueCapacity;
       }
       /**
        * Return this ThreadPool queue current capacity
        */
       public int getCurrentQueueSize() {
           return getThreadPoolExecutor().getQueue().size();
       }
       /**
        * Calls <code>shutdown</code> when the BeanFactory destroys the task 
executor instance.
        * 
        * @see #shutdown()
        */
       @Override
       public void destroy() {
           shutdown();
       }
       /**
        * Perform a shutdown on the ThreadPoolExecutor.
        * 
        * @see java.util.concurrent.ThreadPoolExecutor#shutdown()
        */
       public void shutdown() {
           if (logger.isInfoEnabled()) {
               logger.info("Shutting down ThreadPoolExecutor" + (this.beanName 
!= null ? " '" + this.beanName + "'" : ""));
           }
           if (this.waitForTasksToCompleteOnShutdown) {
               this.threadPoolExecutor.shutdown();
           } else {
               this.threadPoolExecutor.shutdownNow();
           }
       }
       // 
------------------下面的方法均是转掉用ThreadPoolExecutor-----------------------------//
       @Override
       public List<Runnable> shutdownNow() {
           return getThreadPoolExecutor().shutdownNow();
       }
       @Override
       public boolean isShutdown() {
           return getThreadPoolExecutor().isShutdown();
       }
       @Override
       public boolean isTerminated() {
           return getThreadPoolExecutor().isTerminated();
       }
       @Override
       public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
           return getThreadPoolExecutor().awaitTermination(timeout, unit);
       }
       @Override
       public <T> Future<T> submit(Callable<T> task) {
           return getThreadPoolExecutor().submit(task);
       }
       @Override
       public <T> Future<T> submit(Runnable task, T result) {
           return getThreadPoolExecutor().submit(task, result);
       }
       @Override
       public Future<?> submit(Runnable task) {
           return getThreadPoolExecutor().submit(task);
       }
       @Override
       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
           return getThreadPoolExecutor().invokeAll(tasks);
       }
       @Override
       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit)
                                                                                
                                 throws InterruptedException {
           return getThreadPoolExecutor().invokeAll(tasks, timeout, unit);
       }
       @Override
       public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
           return getThreadPoolExecutor().invokeAny(tasks);
       }
       @Override
       public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
                                                                                
                   throws InterruptedException,
                                                                                
                   ExecutionException,
                                                                                
                   TimeoutException {
           return getThreadPoolExecutor().invokeAny(tasks, timeout, unit);
       }
       @Override
       public void execute(Runnable task, long startTimeout) {
           // TODO Auto-generated method stub
       }
   }
   ```
   
   When the thread pool occupancy rate is relatively high, you can set tasks 
for the corresponding blocking queues and monitor the progress of push tasks
   
   ```
   @Autowired
   private BusinessControlManager businessControlManager;
   /**
    * 任务启动
    * url:http://localhost:8091/task/start?bizType=test
    */
   @RequestMapping(value = "/task/start")
   public String startTask(HttpServletRequest request, HttpServletResponse 
response) throws Exception {
       try {
           if (DataUtils.switchOpen == true) {
               return "任务已启动,无需重复启动!";
           } else {
               DataUtils.switchOpen = true;
               DataUtils.resetRecordCount();
           }
           DataContextParam dataContextParam = new DataContextParam();
           String bizType = request.getParameter("bizType");
           dataContextParam.setBizType(bizType);
           businessControlManager.doHandle(dataContextParam);
       } catch (Exception e) {
           logger.error("[TaskController.startTask] error!", e);
           return "任务启动失败";
       }
       return "任务启动成功";
   }
   /**
    * 系统参数调整
    * 
http://localhost:8091/task/adjust?consumerCurrentQueueSizeLimit=17&maxThreadSize=12&coreThreadSize=12
    */
   @RequestMapping(value = "/task/adjust")
   public Object paramAdjust(HttpServletRequest request, HttpServletResponse 
response) throws Exception {
       
DataUtils.setConsumerCurrentQueueSizeLimit(Integer.valueOf(request.getParameter("consumerCurrentQueueSizeLimit")));
       int coreThreadSize = 
Integer.valueOf(request.getParameter("coreThreadSize"));
       int maxThreadSize = 
Integer.valueOf(request.getParameter("maxThreadSize"));
       // 注意:核心线程数不能大于最大线程数,否则线程会不断创建、销毁,浪费系统资源
       if (coreThreadSize > maxThreadSize) {
           coreThreadSize = maxThreadSize;
       }
       DataUtils.setCustomerCorePoolSize(coreThreadSize);
       DataUtils.setCustomerMaxThreadSize(maxThreadSize);
       return "系统参数调整成功";
   }
   /**
    * 任务处理进度
    * url:http://localhost:8091/task/process
    */
   @RequestMapping(value = "/task/process")
   public ProcessResult processResult(HttpServletRequest request, 
HttpServletResponse response) throws Exception {
       ProcessResult processResult = new ProcessResult();
       // 业务信息
       processResult.setSourceDataSize(DataUtils.sourceDataSize.longValue());
       processResult.setSuccessCount(DataUtils.handleSuccessSize.longValue());
       processResult.setFailCount(DataUtils.handleFailSize.longValue());
       // 系统信息
       
processResult.setCustomerMaxPoolSize(DataUtils.consumerExecutor.getMaxPoolSize());
       
processResult.setCustomerCorePoolSize(DataUtils.consumerExecutor.getCorePoolSize());
       
processResult.setCustomerActiveCount(DataUtils.consumerExecutor.getActiveCount());
       
processResult.setCustomerCurrentQueueSize(DataUtils.consumerExecutor.getCurrentQueueSize());
       processResult.setBlockQueueSize(DataUtils.taskQueue.size());
       
processResult.setConsumerCurrentQueueSizeLimit(DataUtils.consumerCurrentQueueSizeLimit);
       return processResult;
   }
   ```
   
   advantage:
   - Convention is better than configuration, using queues to cache data and 
multi-threading to consume data. You only need to implement the data 
acquisition interface and consumption interface according to the 
specifications, and you only need to pay attention to the details of thread 
processing
   - Thread control module, during the running of the task, you can dynamically 
adjust the number of core threads, the maximum number of threads, and the upper 
limit of the task queue at any time
   
   Disadvantage: The message is stored in the cache, if there is network jitter 
or the producer hangs up. Then the last successfully consumed message cannot be 
saved. Can't restore progress either
   Secondly, when the task traffic is too large, or the delay is high. Back 
pressure can not be better resolved
   
   
   - buffer pool flush
   Response generation: large amount of data or high latency
   Backpressure solution in stream processing (take Flink as an example): Flink 
is mainly composed of two major components, operators and streams, at runtime. 
Each operator consumes an intermediate stream, performs transformations on the 
stream, and generates a new stream. In Flink, these logical flows are like 
distributed blocking queues, and the queue capacity is realized through the 
buffer pool (LocalBufferPool). Each stream that is produced and consumed is 
assigned a buffer pool. The buffer pool manages a group of buffers (Buffer), 
which can be recycled after being consumed.
   
   
![5645654](https://user-images.githubusercontent.com/20021404/229272732-57e6fcbb-6812-40b2-bc4b-5c7f55a42e00.png)
   
   The figure above shows the data transfer between two tasks:
   - Record "A" entered Flink and was processed by Task 1 (omitting some 
deserialization and Netty receiving process in the middle)
   - Records are not serialized into buffers (buffers with storage space in 
LocalBufferPool1)
   - Buffer is sent to Task 2 to read records from this buffer 
(LocalBufferLocal2 has space to receive buffer)
   So can we use this as an idea to transform the first solution?
   
   Now that we're ready to refactor EventTargetPusher into a producer-consumer 
model. Then there is bound to be a batching process for production and 
consumption Tasks. You only need to set a certain threshold for the batched 
record collection, and after reaching the flush to the downstream event target, 
you can control the flushing timing of each type of task. In this way, the 
blocking problem caused by disk brushing based on record records can be avoided.
   
   Let us take the scenario of obtaining the interactive query results of the 
select statement in the flink-sql-gateway project as an example
   flink-sql-gateway:  ```https://github.com/ververica/flink-sql-gateway.git```
   In the following code segment, the startRetrieval function connects to the 
master process query through SocketStreamIterator.
   And return the result asynchronously. Here startRetrieval corresponds to the 
production logic of the pusher, and the ResultRetrievalThread thread 
corresponds to the consumption logic. We can set changeRecordBuffer in memory 
and set the size of maxBufferSize, and automatically push it to the downstream 
event target after reaching the water level
   
   ```
   public class ChangelogResult<C> extends AbstractResult<C, Tuple2<Boolean, 
Row>> {
      private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
      ........
      private final CollectStreamTableSink collectTableSink;
      private final ResultRetrievalThread retrievalThread;
      private CompletableFuture<JobExecutionResult> jobExecutionResultFuture;
      private final Object resultLock;
      private AtomicReference<SqlExecutionException> executionException = new 
AtomicReference<>();
      private final List<Tuple2<Boolean, Row>> changeRecordBuffer;
      private final int maxBufferSize;
      public ChangelogResult(
            RowTypeInfo outputType,
            TableSchema tableSchema,
            ExecutionConfig config,
            InetAddress gatewayAddress,
            int gatewayPort,
            ClassLoader classLoader,
            int maxBufferSize) {
         resultLock = new Object();
         // create socket stream iterator
        ......
         retrievalThread = new ResultRetrievalThread();
         // prepare for changelog
         changeRecordBuffer = new ArrayList<>();
         this.maxBufferSize = maxBufferSize;
      }
      @Override
      public void startRetrieval(JobClient jobClient) {
         // start listener thread
         retrievalThread.start();
         jobExecutionResultFuture = CompletableFuture.completedFuture(jobClient)
            .thenCompose(client -> client.getJobExecutionResult(classLoader))
            .whenComplete((unused, throwable) -> {
               if (throwable != null) {
                  executionException.compareAndSet(
                     null,
                     new SqlExecutionException("Error while submitting job.", 
throwable));
               }
            });
      }
      @Override
      public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
         synchronized (resultLock) {
            // retrieval thread is alive return a record if available
            // but the program must not have failed
            if (isRetrieving() && executionException.get() == null) {
               if (changeRecordBuffer.isEmpty()) {
                  return TypedResult.empty();
               } else {
                  final List<Tuple2<Boolean, Row>> change = new 
ArrayList<>(changeRecordBuffer);
                  changeRecordBuffer.clear();
                  resultLock.notify();
                  return TypedResult.payload(change);
               }
            }
            // retrieval thread is dead but there is still a record to be 
delivered
            else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
               final List<Tuple2<Boolean, Row>> change = new 
ArrayList<>(changeRecordBuffer);
               changeRecordBuffer.clear();
               return TypedResult.payload(change);
            }
            // no results can be returned anymore
            else {
               return handleMissingResult();
            }
         }
      }
     ......
      @Override
      public void close() {
         retrievalThread.isRunning = false;
         retrievalThread.interrupt();
         iterator.close();
      }
     .......
      private boolean isRetrieving() {
         return retrievalThread.isRunning;
      }
      private void processRecord(Tuple2<Boolean, Row> change) {
         synchronized (resultLock) {
            // wait if the buffer is full
            if (changeRecordBuffer.size() >= maxBufferSize) {
               try {
                  resultLock.wait();
               } catch (InterruptedException e) {
                  // ignore
               }
            } else {
               changeRecordBuffer.add(change);
            }
         }
      }
      private class ResultRetrievalThread extends Thread {
         public volatile boolean isRunning = true;
         @Override
         public void run() {
            try {
               while (isRunning && iterator.hasNext()) {
                  final Tuple2<Boolean, Row> change = iterator.next();
                  processRecord(change);
               }
            } catch (RuntimeException e) {
               // ignore socket exceptions
            }
            // no result anymore
            // either the job is done or an error occurred
            isRunning = false;
         }
      }
   }
   ```
   
   Since the above does not consider the downtime of the producer or consumer 
machine, perhaps we can persist the task processing progress of the first 
solution and the information of the current consumption topic when an exception 
is thrown. The current consumption topic information is as follows:
   
   ```
   `consumer_group` varchar(128) NOT NULL DEFAULT '',
   `message_id` varchar(255) NOT NULL DEFAULT '',
   `topic_name` varchar(255) NOT NULL DEFAULT '',
   `ctime` bigint(20) NOT NULL,
   `queue_id` int(11) NOT NULL,
   `offset` bigint(20) NOT NULL,
   `broker_name` varchar(255) NOT NULL DEFAULT '',
   `id` bigint(20) NOT NULL AUTO_INCREMENT,
   ```
   
   When starting the task next time, start hui'fu from the saved last failed 
task information


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to