jevinjiang commented on issue #4788:
URL: https://github.com/apache/eventmesh/issues/4788#issuecomment-2014234785

   Hi here is my design. please let me know if there are any issues!
   `
   @Slf4j
   public class Channel implements LifeCycle {
   
   public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
   @Getter
   private DisruptorProvider provider;
   private final Integer size;
   private final Subscribe subscribe;
   private volatile boolean started = false;
   
   public Channel(Subscribe subscribe) {
       this(DEFAULT_SIZE, subscribe);
   }
   
   public Channel(final Integer ringBufferSize, Subscribe subscribe) {
       this.size = ringBufferSize;
       this.subscribe = subscribe;
   }
   
   @Override
   public boolean isStarted() {
       return started;
   }
   
   @Override
   public boolean isClosed() {
       return !isStarted();
   }
   
   public synchronized void start() {
       if (isClosed()) {
           doStart();
           started = true;
       }
   }
   
   public void doStart() {
       Disruptor<MessageEntity> disruptor = new Disruptor<>(
           DisruptorEventFactory.instance(),
           size,
           DisruptorThreadFactory.create("standalone_disruptor_provider_" + 
subscribe.getTopicName(), false),
           ProducerType.MULTI,
           new BlockingWaitStrategy()
       );
   
       disruptor.handleEventsWith(subscribe);
       disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
       disruptor.start();
       RingBuffer<MessageEntity> ringBuffer = disruptor.getRingBuffer();
       provider = new DisruptorProvider(ringBuffer, disruptor);
       
   }
   
   public int getMessageCount() {
       return provider.getMessageCount();
   }
   
   @Override
   public synchronized void shutdown() {
       if (isStarted()) {
           provider.shutdown();
           provider = null;
           started = false;
       }
   }
   }`
   
   `
   public class StandaloneBrokerByDisruptor {
   
   private final ConcurrentHashMap<TopicMetadata, Channel> messageContainer;
   
   private StandaloneBrokerByDisruptor() {
       this.messageContainer = new ConcurrentHashMap<>();
   }
   
   public ConcurrentHashMap<TopicMetadata, Channel> getMessageContainer() {
       return this.messageContainer;
   }
   
   public static StandaloneBrokerByDisruptor getInstance() {
       return StandaloneBrokerInstanceHolder.instance;
   }
   
   /**
    * put message
    *
    * @param topicName topic name
    * @param message   message
    * @throws InterruptedException
    */
   public MessageEntity putMessage(String topicName, CloudEvent message) throws 
InterruptedException {
       TopicMetadata topicMetadata = new TopicMetadata(topicName);
       if (messageContainer.containsKey(topicMetadata)) {
           throw new StorageRuntimeException("the topic not subscribed");
       }
       Channel channel = messageContainer.get(topicMetadata);
       MessageEntity messageEntity = new MessageEntity(new 
TopicMetadata(topicName), message);
       channel.getProvider().onData(messageEntity);
       return messageEntity;
   }
   
   
   
   public void createTopicIfAbsent(String topicName, Subscribe subscribe) {
       TopicMetadata topicMetadata = new TopicMetadata(topicName);
       Channel channel = new Channel(subscribe);
       channel.start();
       messageContainer.put(topicMetadata, channel);
   }
   
   /**
    * if the topic exists, delete the topic
    *
    * @param topicName topicName
    */
   public void deleteTopicIfExist(String topicName) {
       TopicMetadata topicMetadata = new TopicMetadata(topicName);
       Channel channel = messageContainer.get(topicMetadata);
       channel.shutdown();
       messageContainer.remove(topicMetadata);
   }
   
   
   private static class StandaloneBrokerInstanceHolder {
       private static final StandaloneBrokerByDisruptor instance = new 
StandaloneBrokerByDisruptor();
   }
   }
   `
   Message queue replaced with Channel 
   `
   @slf4j
   public class Subscribe implements WorkHandler, EventHandler {
   
   @Getter
   private final String topicName;
   private final StandaloneBrokerByDisruptor standaloneBroker;
   private final EventListener listener;
   @Getter
   private volatile boolean isRunning;
   
   private final ExecutorService consumeExecutorService;
   
   public Subscribe(String topicName,
       StandaloneBrokerByDisruptor standaloneBroker,
       EventListener listener, ExecutorService consumeExecutorService) {
       this.topicName = topicName;
       this.standaloneBroker = standaloneBroker;
       this.listener = listener;
       this.isRunning = true;
       this.consumeExecutorService = consumeExecutorService;
   }
   
   public void subscribe() {
       standaloneBroker.createTopicIfAbsent(topicName, this);
   }
   
   public void shutdown() {
       standaloneBroker.deleteTopicIfExist(topicName);
       isRunning = false;
   }
   
   @Override
   public void onEvent(MessageEntity messageEntity) {
       if (isRunning) {
           consumeExecutorService.execute(() -> doWork(messageEntity));
       }
   }
   
   @Override
   public void onEvent(MessageEntity messageEntity, long l, boolean b) throws 
Exception {
       this.onEvent(messageEntity);
   }
   
   public void doWork(MessageEntity messageEntity) {
       long offset = messageEntity.getOffset();
       CloudEvent message = messageEntity.getMessage();
       log.debug("execute subscribe task, topic: {}, offset: {}", topicName, 
offset);
   
       try {
           EventMeshAsyncConsumeContext consumeContext = new 
EventMeshAsyncConsumeContext() {
   
               @Override
               public void commit(EventMeshAction action) {
                   switch (action) {
                       case CommitMessage:
                           // update offset
                           log.info("message commit, topic: {}, current 
offset:{}", topicName, offset);
                           break;
                       case ManualAck:
                           // update offset
                           log.info("message ack, topic: {}, current 
offset:{}", topicName, offset);
                           break;
                       case ReconsumeLater:
                       default:
   
                   }
               }
           };
           listener.consume(message, consumeContext);
       } catch (Exception ex) {
           log.error("consumer error, topic: {}, offset: {}", topicName, 
offset, ex);
       }
   }
   }`
   
   Topic is triggered by subscribers to create, and unsubscribed topics cannot 
be sent messages
   
   `
   public class StandaloneAdmin extends AbstractAdmin {
   
   private final StandaloneBrokerByDisruptor standaloneBroker;
   
   public StandaloneAdmin() {
       super(new AtomicBoolean(false));
       this.standaloneBroker = StandaloneBrokerByDisruptor.getInstance();
   }
   
   @Override
   public List<TopicProperties> getTopic() throws Exception {
       ConcurrentHashMap<TopicMetadata, Channel> messageContainer = 
this.standaloneBroker.getMessageContainer();
       List<TopicProperties> topicList = new ArrayList<>();
       messageContainer.keySet().forEach(topicMetadata -> {
           Channel channel = messageContainer.get(topicMetadata);
           final int messageCount = channel.getMessageCount();
           topicList.add(new TopicProperties(
               topicMetadata.getTopicName(),
               messageCount));
       });
       topicList.sort(Comparator.comparing(t -> t.name));
       return topicList;
   }
   
   @Override
   public void createTopic(String topicName) {
   }
   
   @Override
   public void deleteTopic(String topicName) {
       standaloneBroker.deleteTopicIfExist(topicName);
   }
   
   
   @Override
   public void publish(CloudEvent cloudEvent) throws Exception {
       this.standaloneBroker.putMessage(cloudEvent.getSubject(), cloudEvent);
   }
   }
   
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org
For additional commands, e-mail: issues-h...@eventmesh.apache.org

Reply via email to