jevinjiang commented on issue #4788: URL: https://github.com/apache/eventmesh/issues/4788#issuecomment-2014234785
Hi here is my design. please let me know if there are any issues! ` @Slf4j public class Channel implements LifeCycle { public static final Integer DEFAULT_SIZE = 4096 << 1 << 1; @Getter private DisruptorProvider provider; private final Integer size; private final Subscribe subscribe; private volatile boolean started = false; public Channel(Subscribe subscribe) { this(DEFAULT_SIZE, subscribe); } public Channel(final Integer ringBufferSize, Subscribe subscribe) { this.size = ringBufferSize; this.subscribe = subscribe; } @Override public boolean isStarted() { return started; } @Override public boolean isClosed() { return !isStarted(); } public synchronized void start() { if (isClosed()) { doStart(); started = true; } } public void doStart() { Disruptor<MessageEntity> disruptor = new Disruptor<>( DisruptorEventFactory.instance(), size, DisruptorThreadFactory.create("standalone_disruptor_provider_" + subscribe.getTopicName(), false), ProducerType.MULTI, new BlockingWaitStrategy() ); disruptor.handleEventsWith(subscribe); disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); disruptor.start(); RingBuffer<MessageEntity> ringBuffer = disruptor.getRingBuffer(); provider = new DisruptorProvider(ringBuffer, disruptor); } public int getMessageCount() { return provider.getMessageCount(); } @Override public synchronized void shutdown() { if (isStarted()) { provider.shutdown(); provider = null; started = false; } } }` ` public class StandaloneBrokerByDisruptor { private final ConcurrentHashMap<TopicMetadata, Channel> messageContainer; private StandaloneBrokerByDisruptor() { this.messageContainer = new ConcurrentHashMap<>(); } public ConcurrentHashMap<TopicMetadata, Channel> getMessageContainer() { return this.messageContainer; } public static StandaloneBrokerByDisruptor getInstance() { return StandaloneBrokerInstanceHolder.instance; } /** * put message * * @param topicName topic name * @param message message * @throws InterruptedException */ public MessageEntity putMessage(String topicName, CloudEvent message) throws InterruptedException { TopicMetadata topicMetadata = new TopicMetadata(topicName); if (messageContainer.containsKey(topicMetadata)) { throw new StorageRuntimeException("the topic not subscribed"); } Channel channel = messageContainer.get(topicMetadata); MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message); channel.getProvider().onData(messageEntity); return messageEntity; } public void createTopicIfAbsent(String topicName, Subscribe subscribe) { TopicMetadata topicMetadata = new TopicMetadata(topicName); Channel channel = new Channel(subscribe); channel.start(); messageContainer.put(topicMetadata, channel); } /** * if the topic exists, delete the topic * * @param topicName topicName */ public void deleteTopicIfExist(String topicName) { TopicMetadata topicMetadata = new TopicMetadata(topicName); Channel channel = messageContainer.get(topicMetadata); channel.shutdown(); messageContainer.remove(topicMetadata); } private static class StandaloneBrokerInstanceHolder { private static final StandaloneBrokerByDisruptor instance = new StandaloneBrokerByDisruptor(); } } ` Message queue replaced with Channel ` @slf4j public class Subscribe implements WorkHandler, EventHandler { @Getter private final String topicName; private final StandaloneBrokerByDisruptor standaloneBroker; private final EventListener listener; @Getter private volatile boolean isRunning; private final ExecutorService consumeExecutorService; public Subscribe(String topicName, StandaloneBrokerByDisruptor standaloneBroker, EventListener listener, ExecutorService consumeExecutorService) { this.topicName = topicName; this.standaloneBroker = standaloneBroker; this.listener = listener; this.isRunning = true; this.consumeExecutorService = consumeExecutorService; } public void subscribe() { standaloneBroker.createTopicIfAbsent(topicName, this); } public void shutdown() { standaloneBroker.deleteTopicIfExist(topicName); isRunning = false; } @Override public void onEvent(MessageEntity messageEntity) { if (isRunning) { consumeExecutorService.execute(() -> doWork(messageEntity)); } } @Override public void onEvent(MessageEntity messageEntity, long l, boolean b) throws Exception { this.onEvent(messageEntity); } public void doWork(MessageEntity messageEntity) { long offset = messageEntity.getOffset(); CloudEvent message = messageEntity.getMessage(); log.debug("execute subscribe task, topic: {}, offset: {}", topicName, offset); try { EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() { @Override public void commit(EventMeshAction action) { switch (action) { case CommitMessage: // update offset log.info("message commit, topic: {}, current offset:{}", topicName, offset); break; case ManualAck: // update offset log.info("message ack, topic: {}, current offset:{}", topicName, offset); break; case ReconsumeLater: default: } } }; listener.consume(message, consumeContext); } catch (Exception ex) { log.error("consumer error, topic: {}, offset: {}", topicName, offset, ex); } } }` Topic is triggered by subscribers to create, and unsubscribed topics cannot be sent messages ` public class StandaloneAdmin extends AbstractAdmin { private final StandaloneBrokerByDisruptor standaloneBroker; public StandaloneAdmin() { super(new AtomicBoolean(false)); this.standaloneBroker = StandaloneBrokerByDisruptor.getInstance(); } @Override public List<TopicProperties> getTopic() throws Exception { ConcurrentHashMap<TopicMetadata, Channel> messageContainer = this.standaloneBroker.getMessageContainer(); List<TopicProperties> topicList = new ArrayList<>(); messageContainer.keySet().forEach(topicMetadata -> { Channel channel = messageContainer.get(topicMetadata); final int messageCount = channel.getMessageCount(); topicList.add(new TopicProperties( topicMetadata.getTopicName(), messageCount)); }); topicList.sort(Comparator.comparing(t -> t.name)); return topicList; } @Override public void createTopic(String topicName) { } @Override public void deleteTopic(String topicName) { standaloneBroker.deleteTopicIfExist(topicName); } @Override public void publish(CloudEvent cloudEvent) throws Exception { this.standaloneBroker.putMessage(cloudEvent.getSubject(), cloudEvent); } } ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For additional commands, e-mail: issues-h...@eventmesh.apache.org