This is an automated email from the ASF dual-hosted git repository.

xiatian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 50a36aadf [ISSUE #4788] Support disruptor as memory queue (#4844)
50a36aadf is described below

commit 50a36aadf3a13b218743bf0d4a8707540f7c051d
Author: Jevin Jiang <[email protected]>
AuthorDate: Mon Jul 15 16:06:09 2024 +0800

    [ISSUE #4788] Support disruptor as memory queue (#4844)
    
    * [ISSUE #4788] Support disruptor as memory queue
    
    * [ISSUE #4788] fix code style
    
    ---------
    
    Co-authored-by: JiangShuJu <[email protected]>
---
 .../eventmesh-storage-standalone/build.gradle      |   1 +
 .../storage/standalone/admin/StandaloneAdmin.java  |  26 +----
 .../storage/standalone/broker/Channel.java         | 105 +++++++++++++++++++
 .../standalone/broker/StandaloneBroker.java        | 113 ++++++++++-----------
 .../standalone/broker/model/MessageEntity.java     |   8 ++
 .../broker/provider/DisruptorProvider.java         | 109 ++++++++++++++++++++
 .../storage/standalone/broker/task/Subscribe.java  |  87 ++++++++--------
 .../standalone/broker/task/SubscribeTask.java      |  51 ----------
 .../standalone/consumer/StandaloneConsumer.java    |  16 +--
 .../eventmesh/storage/standalone/TestUtils.java    |  31 +++++-
 .../standalone/admin/StandaloneAdminTest.java      |  39 +++----
 .../standalone/broker/StandaloneBrokerTest.java    |  51 +++-------
 .../standalone/broker/task/SubscribeTest.java      |  10 +-
 .../producer/StandaloneProducerTest.java           |   8 ++
 14 files changed, 385 insertions(+), 270 deletions(-)

diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
index a8b1827aa..22271fb57 100644
--- a/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
+++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
@@ -18,6 +18,7 @@
 dependencies {
     implementation project(":eventmesh-common")
     implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
+    implementation "com.lmax:disruptor"
 
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
index 7f5ab2da6..72257647a 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
@@ -19,7 +19,7 @@ package org.apache.eventmesh.storage.standalone.admin;
 
 import org.apache.eventmesh.api.admin.AbstractAdmin;
 import org.apache.eventmesh.api.admin.TopicProperties;
-import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
+import org.apache.eventmesh.storage.standalone.broker.Channel;
 import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
 import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
 
@@ -42,11 +42,11 @@ public class StandaloneAdmin extends AbstractAdmin {
 
     @Override
     public List<TopicProperties> getTopic() throws Exception {
-        ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = 
this.standaloneBroker.getMessageContainer();
+        ConcurrentHashMap<TopicMetadata, Channel> messageContainer = 
this.standaloneBroker.getMessageContainer();
         List<TopicProperties> topicList = new ArrayList<>();
         messageContainer.keySet().forEach(topicMetadata -> {
-            MessageQueue messageQueue = messageContainer.get(topicMetadata);
-            final int messageCount = messageQueue.getPutIndex() - 
messageQueue.getTakeIndex();
+            Channel channel = messageContainer.get(topicMetadata);
+            final int messageCount = channel.getMessageCount();
             topicList.add(new TopicProperties(
                 topicMetadata.getTopicName(),
                 messageCount));
@@ -65,25 +65,7 @@ public class StandaloneAdmin extends AbstractAdmin {
         standaloneBroker.deleteTopicIfExist(topicName);
     }
 
-    @Override
-    public List<CloudEvent> getEvent(String topicName, int offset, int length) 
throws Exception {
-        if (!this.standaloneBroker.checkTopicExist(topicName)) {
-            throw new Exception("The topic name doesn't exist in the message 
queue");
-        }
-        ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = 
this.standaloneBroker.getMessageContainer();
-        long topicOffset = messageContainer.get(new 
TopicMetadata(topicName)).getTakeIndex();
 
-        List<CloudEvent> messageList = new ArrayList<>();
-        for (int index = 0; index < length; index++) {
-            long messageOffset = topicOffset + offset + index;
-            CloudEvent event = this.standaloneBroker.getMessage(topicName, 
messageOffset);
-            if (event == null) {
-                break;
-            }
-            messageList.add(event);
-        }
-        return messageList;
-    }
 
     @Override
     public void publish(CloudEvent cloudEvent) throws Exception {
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
new file mode 100644
index 000000000..2ea7310b8
--- /dev/null
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.storage.standalone.broker;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
+import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
+import 
org.apache.eventmesh.storage.standalone.broker.provider.DisruptorProvider;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.IgnoreExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+import lombok.Getter;
+
+
+public class Channel implements LifeCycle {
+
+    public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
+    @Getter
+    private DisruptorProvider provider;
+    private final Integer size;
+    private final EventHandler<MessageEntity> eventHandler;
+    private volatile boolean started = false;
+    private final TopicMetadata topic;
+    private static final String THREAD_NAME_PREFIX = 
"standalone_disruptor_provider_";
+
+    public Channel(TopicMetadata topic, EventHandler<MessageEntity> 
eventHandler) {
+        this(DEFAULT_SIZE, topic, eventHandler);
+    }
+
+
+    public Channel(final Integer ringBufferSize, final TopicMetadata topic, 
final EventHandler<MessageEntity> eventHandler) {
+        this.size = ringBufferSize;
+        this.topic = topic;
+        this.eventHandler = eventHandler;
+    }
+
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return !isStarted();
+    }
+
+    public synchronized void start() {
+        if (isClosed()) {
+            doStart();
+            started = true;
+        }
+    }
+
+    public void doStart() {
+        Disruptor<MessageEntity> disruptor = new Disruptor<>(
+            MessageEntity::new,
+            size,
+            new EventMeshThreadFactory(THREAD_NAME_PREFIX + 
topic.getTopicName(), true),
+            ProducerType.MULTI,
+            new BlockingWaitStrategy()
+        );
+
+        disruptor.handleEventsWith(eventHandler);
+        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
+        RingBuffer<MessageEntity> ringBuffer = disruptor.getRingBuffer();
+        provider = new DisruptorProvider(ringBuffer, disruptor);
+        provider.start();
+    }
+
+    public int getMessageCount() {
+        return provider.getMessageCount();
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (isStarted()) {
+            provider.shutdown();
+            provider = null;
+            started = false;
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
index 5e64b40a7..8654b2d1c 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
@@ -19,42 +19,36 @@ package org.apache.eventmesh.storage.standalone.broker;
 
 import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
 import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
-import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClear;
-import 
org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClearTask;
-
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 
 import io.cloudevents.CloudEvent;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * This broker used to store event, it just support standalone mode, you 
shouldn't use this module in production environment
  */
+@Slf4j
 public class StandaloneBroker {
 
-    private final ConcurrentHashMap<TopicMetadata, MessageQueue> 
messageContainer;
+    // message source by topic
+    @Getter
+    private final ConcurrentHashMap<TopicMetadata, Channel> messageContainer;
 
-    // todo: move the offset manage to consumer
-    private final ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap;
+    @Getter
+    private final ConcurrentHashMap<TopicMetadata, Subscribe> 
subscribeContainer;
 
     private StandaloneBroker() {
         this.messageContainer = new ConcurrentHashMap<>();
-        this.offsetMap = new ConcurrentHashMap<>();
-        startHistoryMessageCleanTask();
-    }
-
-    public ConcurrentHashMap<TopicMetadata, MessageQueue> 
getMessageContainer() {
-        return this.messageContainer;
+        this.subscribeContainer = new ConcurrentHashMap<>();
     }
 
-    public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
-        return this.offsetMap;
-    }
 
     public static StandaloneBroker getInstance() {
-        return StandaloneBrokerInstanceHolder.instance;
+        return StandaloneBrokerInstanceHolder.INSTANCE;
     }
 
     /**
@@ -62,28 +56,38 @@ public class StandaloneBroker {
      *
      * @param topicName topic name
      * @param message   message
-     * @throws InterruptedException
      */
-    public MessageEntity putMessage(String topicName, CloudEvent message) 
throws InterruptedException {
-        Pair<MessageQueue, AtomicLong> pair = createTopicIfAbsent(topicName);
-        AtomicLong topicOffset = pair.getRight();
-        MessageQueue messageQueue = pair.getLeft();
-
-        MessageEntity messageEntity = new MessageEntity(
-            new TopicMetadata(topicName), message, 
topicOffset.getAndIncrement(), System.currentTimeMillis());
-        messageQueue.put(messageEntity);
-
+    public MessageEntity putMessage(String topicName, CloudEvent message) {
+        TopicMetadata topicMetadata = new TopicMetadata(topicName);
+        if (!messageContainer.containsKey(topicMetadata)) {
+            createTopic(topicName);
+        }
+        Channel channel = messageContainer.get(topicMetadata);
+        MessageEntity messageEntity = new MessageEntity(new 
TopicMetadata(topicName), message);
+        channel.getProvider().onData(messageEntity);
         return messageEntity;
     }
 
+    public Channel createTopic(String topicName) {
+        TopicMetadata topicMetadata = new TopicMetadata(topicName);
+        return messageContainer.computeIfAbsent(topicMetadata, k -> {
+            Subscribe subscribe = subscribeContainer.get(topicMetadata);
+            if (subscribe == null) {
+                throw new IllegalStateException("the topic not exist subscribe 
");
+            }
+            Channel channel = new Channel(topicMetadata, subscribe);
+            channel.start();
+            return channel;
+        });
+    }
+
     /**
      * Get the message, if the queue is empty then await
      *
      * @param topicName
      */
     public CloudEvent takeMessage(String topicName) throws 
InterruptedException {
-        TopicMetadata topicMetadata = new TopicMetadata(topicName);
-        return messageContainer.computeIfAbsent(topicMetadata, k -> new 
MessageQueue()).take().getMessage();
+        return null;
     }
 
     /**
@@ -92,12 +96,7 @@ public class StandaloneBroker {
      * @param topicName
      */
     public CloudEvent getMessage(String topicName) {
-        TopicMetadata topicMetadata = new TopicMetadata(topicName);
-        MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k 
-> new MessageQueue()).getHead();
-        if (head == null) {
-            return null;
-        }
-        return head.getMessage();
+        return null;
     }
 
     /**
@@ -108,21 +107,9 @@ public class StandaloneBroker {
      * @return CloudEvent
      */
     public CloudEvent getMessage(String topicName, long offset) {
-        TopicMetadata topicMetadata = new TopicMetadata(topicName);
-        MessageEntity messageEntity = 
messageContainer.computeIfAbsent(topicMetadata, k -> new 
MessageQueue()).getByOffset(offset);
-        if (messageEntity == null) {
-            return null;
-        }
-        return messageEntity.getMessage();
+        return null;
     }
 
-    private void startHistoryMessageCleanTask() {
-        HistoryMessageClear historyMessageClear = new 
HistoryMessageClear(messageContainer);
-        Thread thread = new Thread(new 
HistoryMessageClearTask(historyMessageClear));
-        thread.setDaemon(true);
-        thread.setName("StandaloneBroker-HistoryMessageCleanTask");
-        thread.start();
-    }
 
     public boolean checkTopicExist(String topicName) {
         return messageContainer.containsKey(new TopicMetadata(topicName));
@@ -132,13 +119,10 @@ public class StandaloneBroker {
      * if the topic does not exist, create the topic
      *
      * @param topicName topicName
-     * @return messageQueue and offset
+     * @return Channel
      */
-    public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String 
topicName) {
-        TopicMetadata topicMetadata = new TopicMetadata(topicName);
-        MessageQueue messageQueue = 
messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue());
-        AtomicLong offset = offsetMap.computeIfAbsent(topicMetadata, k -> new 
AtomicLong());
-        return Pair.of(messageQueue, offset);
+    public Channel createTopicIfAbsent(String topicName) {
+        return createTopic(topicName);
     }
 
     /**
@@ -148,18 +132,23 @@ public class StandaloneBroker {
      */
     public void deleteTopicIfExist(String topicName) {
         TopicMetadata topicMetadata = new TopicMetadata(topicName);
+        Channel channel = createTopicIfAbsent(topicName);
+        channel.shutdown();
         messageContainer.remove(topicMetadata);
     }
 
-    public void updateOffset(TopicMetadata topicMetadata, long offset) {
-        offsetMap.computeIfPresent(topicMetadata, (k, v) -> {
-            v.set(offset);
-            return v;
-        });
+    public void subscribed(String topicName, Subscribe subscribe) {
+        TopicMetadata topicMetadata = new TopicMetadata(topicName);
+        if (getMessageContainer().containsKey(topicMetadata)) {
+            log.warn("the topic already subscribed");
+            return;
+        }
+        subscribeContainer.put(topicMetadata, subscribe);
     }
 
+
     private static class StandaloneBrokerInstanceHolder {
 
-        private static final StandaloneBroker instance = new 
StandaloneBroker();
+        private static final StandaloneBroker INSTANCE = new 
StandaloneBroker();
     }
-}
+}
\ No newline at end of file
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
index 0f437aee0..3662b3025 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
@@ -21,6 +21,9 @@ import java.io.Serializable;
 
 import io.cloudevents.CloudEvent;
 
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
 public class MessageEntity implements Serializable {
 
     private static final long serialVersionUID = 6646148767540524786L;
@@ -40,6 +43,11 @@ public class MessageEntity implements Serializable {
         this.createTimeMills = currentTimeMills;
     }
 
+    public MessageEntity(TopicMetadata topicMetadata, CloudEvent message) {
+        this.topicMetadata = topicMetadata;
+        this.message = message;
+    }
+
     public TopicMetadata getTopicMetadata() {
         return topicMetadata;
     }
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java
new file mode 100644
index 000000000..47b2665a2
--- /dev/null
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.storage.standalone.broker.provider;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
+
+import com.lmax.disruptor.EventTranslatorOneArg;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * DisruptorProvider. disruptor provider definition.
+ */
+@Slf4j
+public class DisruptorProvider implements LifeCycle {
+
+    private final RingBuffer<MessageEntity> ringBuffer;
+
+    private final Disruptor<MessageEntity> disruptor;
+
+    private volatile boolean start = false;
+
+    private final EventTranslatorOneArg<MessageEntity, MessageEntity> 
translatorOneArg = (messageEntity, sequence, arg0) -> {
+        arg0.setOffset(sequence);
+        arg0.setCreateTimeMills(System.currentTimeMillis());
+        messageEntity.setOffset(arg0.getOffset());
+        messageEntity.setCreateTimeMills(arg0.getCreateTimeMills());
+        messageEntity.setTopicMetadata(arg0.getTopicMetadata());
+        messageEntity.setMessage(arg0.getMessage());
+    };
+
+
+    /**
+     * Instantiates a new Disruptor provider.
+     *
+     * @param ringBuffer the ring buffer
+     * @param disruptor  the disruptor
+     */
+    public DisruptorProvider(final RingBuffer<MessageEntity> ringBuffer, final 
Disruptor<MessageEntity> disruptor) {
+        this.ringBuffer = ringBuffer;
+        this.disruptor = disruptor;
+    }
+
+    /**
+     * @param data the data
+     */
+    public MessageEntity onData(final MessageEntity data) {
+        if (isClosed()) {
+            throw new IllegalArgumentException("the disruptor is close");
+        }
+        try {
+            ringBuffer.publishEvent(translatorOneArg, data);
+        } catch (Exception ex) {
+            throw new IllegalStateException("send data fail.");
+        }
+        return data;
+    }
+
+
+    @Override
+    public boolean isStarted() {
+        return start;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return !isStarted();
+    }
+
+    @Override
+    public void start() {
+        if (null != disruptor) {
+            disruptor.start();
+            start = true;
+        }
+    }
+
+    /**
+     * Shutdown.
+     */
+    public void shutdown() {
+        if (null != disruptor) {
+            disruptor.shutdown();
+            start = false;
+        }
+    }
+
+    public int getMessageCount() {
+        return ringBuffer.getBufferSize();
+    }
+}
\ No newline at end of file
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
index 8316270ad..4c84849ac 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
@@ -21,16 +21,19 @@ import org.apache.eventmesh.api.EventListener;
 import org.apache.eventmesh.api.EventMeshAction;
 import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
 
-import java.util.concurrent.atomic.AtomicInteger;
 
 import io.cloudevents.CloudEvent;
 
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.WorkHandler;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class Subscribe {
+public class Subscribe implements WorkHandler<MessageEntity>, 
EventHandler<MessageEntity> {
 
     @Getter
     private final String topicName;
@@ -38,8 +41,6 @@ public class Subscribe {
     private final EventListener listener;
     @Getter
     private volatile boolean isRunning;
-    @Getter
-    private AtomicInteger offset;
 
     public Subscribe(String topicName,
         StandaloneBroker standaloneBroker,
@@ -51,52 +52,50 @@ public class Subscribe {
     }
 
     public void subscribe() {
+        standaloneBroker.subscribed(topicName, this);
+    }
+
+    public void shutdown() {
+        isRunning = false;
+        standaloneBroker.deleteTopicIfExist(topicName);
+    }
+
+    @Override
+    public void onEvent(MessageEntity event, long sequence, boolean 
endOfBatch) {
+        onEvent(event);
+    }
+
+    @Override
+    public void onEvent(MessageEntity event) {
         try {
-            log.debug("execute subscribe task, topic: {}, offset: {}", 
topicName, offset);
-            if (offset == null) {
-                CloudEvent message = standaloneBroker.getMessage(topicName);
-                if (message != null) {
-                    Object tmpOffset = message.getExtension("offset");
-                    if (tmpOffset instanceof Integer) {
-                        offset = new 
AtomicInteger(Integer.parseInt(tmpOffset.toString()));
-                    } else {
-                        offset = new AtomicInteger(0);
-                    }
-                }
+            if (!isRunning) {
+                return;
             }
-            if (offset != null) {
-                CloudEvent message = standaloneBroker.getMessage(topicName, 
offset.get());
-                if (message != null) {
-                    EventMeshAsyncConsumeContext consumeContext = new 
EventMeshAsyncConsumeContext() {
+            CloudEvent message = event.getMessage();
+            if (message != null) {
+                EventMeshAsyncConsumeContext consumeContext = new 
EventMeshAsyncConsumeContext() {
 
-                        @Override
-                        public void commit(EventMeshAction action) {
-                            switch (action) {
-                                case CommitMessage:
-                                    // update offset
-                                    log.info("message commit, topic: {}, 
current offset:{}", topicName, offset.get());
-                                    break;
-                                case ManualAck:
-                                    // update offset
-                                    offset.incrementAndGet();
-                                    log.info("message ack, topic: {}, current 
offset:{}", topicName, offset.get());
-                                    break;
-                                case ReconsumeLater:
-                                default:
-
-                            }
+                    @Override
+                    public void commit(EventMeshAction action) {
+                        switch (action) {
+                            case CommitMessage:
+                                // update offset
+                                log.info("message commit, topic: {}, current 
offset:{}", topicName, event.getOffset());
+                                break;
+                            case ManualAck:
+                                // update offset
+                                log.info("message ack, topic: {}, current 
offset:{}", topicName, event.getOffset());
+                                break;
+                            case ReconsumeLater:
+                            default:
                         }
-                    };
-                    listener.consume(message, consumeContext);
-                }
+                    }
+                };
+                listener.consume(message, consumeContext);
             }
         } catch (Exception ex) {
-            log.error("consumer error, topic: {}, offset: {}", topicName, 
offset == null ? null : offset.get(), ex);
+            log.error("consumer error, topic: {}, offset: {}", topicName, 
event.getOffset(), ex);
         }
     }
 
-    public void shutdown() {
-        isRunning = false;
-    }
-
-}
+}
\ No newline at end of file
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java
deleted file mode 100644
index 0936c7925..000000000
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.storage.standalone.broker.task;
-
-import org.apache.eventmesh.common.utils.ThreadUtils;
-
-import java.util.concurrent.TimeUnit;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class SubscribeTask implements Runnable {
-
-    private Subscribe subscribe;
-
-    public SubscribeTask(Subscribe subscribe) {
-        this.subscribe = subscribe;
-    }
-
-    @Override
-    public void run() {
-        while (subscribe.isRunning()) {
-            subscribe.subscribe();
-            try {
-                ThreadUtils.sleepWithThrowException(1, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                log.error("Thread is interrupted, topic: {}, offset: {} thread 
name: {}",
-                    subscribe.getTopicName(),
-                    subscribe.getOffset() == null ? null : 
subscribe.getOffset().get(),
-                    Thread.currentThread().getName(), e);
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-}
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
index 9eb753e3f..edb66703f 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
@@ -20,17 +20,12 @@ package org.apache.eventmesh.storage.standalone.consumer;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.EventListener;
 import org.apache.eventmesh.api.consumer.Consumer;
-import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
-import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
 import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
-import org.apache.eventmesh.storage.standalone.broker.task.SubscribeTask;
 
 import java.util.List;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.cloudevents.CloudEvent;
@@ -45,16 +40,10 @@ public class StandaloneConsumer implements Consumer {
 
     private final ConcurrentHashMap<String, Subscribe> subscribeTable;
 
-    private final ExecutorService consumeExecutorService;
-
     public StandaloneConsumer(Properties properties) {
         this.standaloneBroker = StandaloneBroker.getInstance();
         this.subscribeTable = new ConcurrentHashMap<>(16);
         this.isStarted = new AtomicBoolean(false);
-        this.consumeExecutorService = 
ThreadPoolFactory.createThreadPoolExecutor(
-            Runtime.getRuntime().availableProcessors() * 2,
-            Runtime.getRuntime().availableProcessors() * 2,
-            "StandaloneConsumerThread");
     }
 
     @Override
@@ -86,8 +75,6 @@ public class StandaloneConsumer implements Consumer {
 
     @Override
     public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext 
context) {
-        cloudEvents.forEach(cloudEvent -> standaloneBroker.updateOffset(
-            new TopicMetadata(cloudEvent.getSubject()), 
Objects.requireNonNull((Long) cloudEvent.getExtension("offset"))));
 
     }
 
@@ -99,9 +86,8 @@ public class StandaloneConsumer implements Consumer {
         synchronized (subscribeTable) {
             standaloneBroker.createTopicIfAbsent(topic);
             Subscribe subscribe = new Subscribe(topic, standaloneBroker, 
listener);
-            SubscribeTask subScribeTask = new SubscribeTask(subscribe);
+            subscribe.subscribe();
             subscribeTable.put(topic, subscribe);
-            consumeExecutorService.execute(subScribeTask);
         }
     }
 
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
index 5ea0ab6f1..0c16aabb3 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
@@ -17,9 +17,14 @@
 
 package org.apache.eventmesh.storage.standalone;
 
+import org.apache.eventmesh.storage.standalone.broker.Channel;
 import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
+import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
 import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
 import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
+
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.net.URI;
 import java.util.Collections;
@@ -29,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
 
+
 public class TestUtils {
 
     public static final String TEST_TOPIC = "test-topic";
@@ -36,12 +42,18 @@ public class TestUtils {
     public static final int LENGTH = 5;
     public static final int EXCEEDED_MESSAGE_STORE_WINDOW = 60 * 60 * 1000 + 
1000;
 
-    public static ConcurrentHashMap<TopicMetadata, MessageQueue> 
createDefaultMessageContainer() {
-        ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = new 
ConcurrentHashMap<>(1);
-        messageContainer.put(new TopicMetadata(TEST_TOPIC), new 
MessageQueue());
-        return messageContainer;
+    public static Pair<ConcurrentHashMap<TopicMetadata, Channel>, 
ConcurrentHashMap<TopicMetadata, Subscribe>> createDefaultMessageContainer(
+        StandaloneBroker broker) {
+        ConcurrentHashMap<TopicMetadata, Channel> messageContainer = new 
ConcurrentHashMap<>(1);
+        ConcurrentHashMap<TopicMetadata, Subscribe> subscribeContainer = new 
ConcurrentHashMap<>(1);
+
+        Subscribe subscribe = createSubscribe(broker);
+        subscribe.subscribe();
+        subscribeContainer.put(new TopicMetadata(TEST_TOPIC), subscribe);
+        return Pair.of(messageContainer, subscribeContainer);
     }
 
+
     public static ConcurrentHashMap<TopicMetadata, MessageQueue> 
createMessageContainer(TopicMetadata topicMetadata, MessageEntity messageEntity)
         throws InterruptedException {
         ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = new 
ConcurrentHashMap<>(1);
@@ -79,4 +91,15 @@ public class TestUtils {
             offSet,
             currentTimeMillis);
     }
+
+    public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) 
{
+        return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, 
context) -> {
+        });
+    }
+
+    public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, 
List<CloudEvent> cloudEvents) {
+        return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, 
context) -> {
+            cloudEvents.add(cloudEvent);
+        });
+    }
 }
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
index 2d84df265..7200f902e 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
@@ -17,18 +17,20 @@
 
 package org.apache.eventmesh.storage.standalone.admin;
 
-import static org.apache.eventmesh.storage.standalone.TestUtils.LENGTH;
-import static org.apache.eventmesh.storage.standalone.TestUtils.OFF_SET;
 import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
 import static 
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent;
 import static 
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultMessageContainer;
 import static 
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultMessageEntity;
 
-import org.apache.eventmesh.api.admin.TopicProperties;
+import org.apache.eventmesh.storage.standalone.broker.Channel;
 import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
 import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
+import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
 
-import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -52,6 +54,7 @@ public class StandaloneAdminTest {
 
     private StandaloneAdmin standaloneAdmin;
 
+
     @BeforeEach
     public void setUp() {
         initStaticInstance();
@@ -69,13 +72,6 @@ public class StandaloneAdminTest {
         Assertions.assertTrue(standaloneAdmin.isClosed());
     }
 
-    @Test
-    public void testGetTopic() throws Exception {
-        List<TopicProperties> topicPropertiesList = standaloneAdmin.getTopic();
-        Assertions.assertNotNull(topicPropertiesList);
-        Assertions.assertFalse(topicPropertiesList.isEmpty());
-    }
-
     @Test
     public void testCreateTopic() {
         standaloneAdmin.createTopic(TEST_TOPIC);
@@ -88,21 +84,6 @@ public class StandaloneAdminTest {
         Mockito.verify(standaloneBroker).deleteTopicIfExist(TEST_TOPIC);
     }
 
-    @Test
-    public void testGetEvent() throws Exception {
-        
Mockito.when(standaloneBroker.checkTopicExist(TEST_TOPIC)).thenReturn(Boolean.TRUE);
-        Mockito.when(standaloneBroker.getMessage(TEST_TOPIC, 
OFF_SET)).thenReturn(createDefaultCloudEvent());
-        List<CloudEvent> events = standaloneAdmin.getEvent(TEST_TOPIC, 
OFF_SET, LENGTH);
-        Assertions.assertNotNull(events);
-        Assertions.assertFalse(events.isEmpty());
-    }
-
-    @Test
-    public void testGetEvent_throwException() {
-        
Mockito.when(standaloneBroker.checkTopicExist(TEST_TOPIC)).thenReturn(Boolean.FALSE);
-        Exception exception = Assertions.assertThrows(Exception.class, () -> 
standaloneAdmin.getEvent(TEST_TOPIC, OFF_SET, LENGTH));
-        Assertions.assertEquals("The topic name doesn't exist in the message 
queue", exception.getMessage());
-    }
 
     @Test
     public void testPublish() throws Exception {
@@ -116,7 +97,11 @@ public class StandaloneAdminTest {
     private void initStaticInstance() {
         try (MockedStatic<StandaloneBroker> standaloneBrokerMockedStatic = 
Mockito.mockStatic(StandaloneBroker.class)) {
             
standaloneBrokerMockedStatic.when(StandaloneBroker::getInstance).thenReturn(standaloneBroker);
-            
Mockito.when(standaloneBroker.getMessageContainer()).thenReturn(createDefaultMessageContainer());
+            Pair<ConcurrentHashMap<TopicMetadata, Channel>, 
ConcurrentHashMap<TopicMetadata, Subscribe>> pair =
+                createDefaultMessageContainer(standaloneBroker);
+            
Mockito.when(standaloneBroker.getSubscribeContainer()).thenReturn(pair.getRight());
+            
Mockito.when(standaloneBroker.getMessageContainer()).thenReturn(pair.getLeft());
+
             standaloneAdmin = new StandaloneAdmin();
         }
     }
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
index 3582f95ef..6d84cb780 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
@@ -17,15 +17,12 @@
 
 package org.apache.eventmesh.storage.standalone.broker;
 
-import static org.apache.eventmesh.storage.standalone.TestUtils.OFF_SET;
 import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
 import static 
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent;
+import static 
org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe;
 
 import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -34,6 +31,14 @@ import io.cloudevents.CloudEvent;
 
 public class StandaloneBrokerTest {
 
+
+    public StandaloneBroker getStandaloneBroker() {
+        StandaloneBroker instance = StandaloneBroker.getInstance();
+        Subscribe subscribe = createSubscribe(instance);
+        subscribe.subscribe();
+        return instance;
+    }
+
     @Test
     public void testGetInstance() {
         Assertions.assertNotNull(StandaloneBroker.getInstance());
@@ -41,49 +46,23 @@ public class StandaloneBrokerTest {
 
     @Test
     public void testCreateTopicIfAbsent() {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
-        Pair<MessageQueue, AtomicLong> pair = 
instance.createTopicIfAbsent(TEST_TOPIC);
+        StandaloneBroker instance = getStandaloneBroker();
+        Channel pair = instance.createTopicIfAbsent(TEST_TOPIC);
         Assertions.assertNotNull(pair);
     }
 
     @Test
     public void testPutMessage() throws InterruptedException {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
+        StandaloneBroker instance = getStandaloneBroker();
         CloudEvent cloudEvent = createDefaultCloudEvent();
         MessageEntity messageEntity = instance.putMessage(TEST_TOPIC, 
cloudEvent);
         Assertions.assertNotNull(messageEntity);
     }
 
-    @Test
-    public void testTakeMessage() throws InterruptedException {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
-        CloudEvent cloudEvent = createDefaultCloudEvent();
-        instance.putMessage(TEST_TOPIC, cloudEvent);
-        CloudEvent message = instance.takeMessage(TEST_TOPIC);
-        Assertions.assertNotNull(message);
-    }
-
-    @Test
-    public void testGetMessage() throws InterruptedException {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
-        CloudEvent cloudEvent = createDefaultCloudEvent();
-        instance.putMessage(TEST_TOPIC, cloudEvent);
-        CloudEvent cloudEventResult = instance.getMessage(TEST_TOPIC);
-        Assertions.assertNotNull(cloudEventResult);
-    }
-
-    @Test
-    public void testMessageWithOffSet() throws InterruptedException {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
-        CloudEvent cloudEvent = createDefaultCloudEvent();
-        instance.putMessage(TEST_TOPIC, cloudEvent);
-        CloudEvent cloudEventResult = instance.getMessage(TEST_TOPIC, OFF_SET);
-        Assertions.assertNotNull(cloudEventResult);
-    }
 
     @Test
     public void testCheckTopicExist() throws InterruptedException {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
+        StandaloneBroker instance = getStandaloneBroker();
         CloudEvent cloudEvent = createDefaultCloudEvent();
         instance.putMessage(TEST_TOPIC, cloudEvent);
         boolean exists = instance.checkTopicExist(TEST_TOPIC);
@@ -92,7 +71,7 @@ public class StandaloneBrokerTest {
 
     @Test
     public void testDeleteTopicIfExist() throws InterruptedException {
-        StandaloneBroker instance = StandaloneBroker.getInstance();
+        StandaloneBroker instance = getStandaloneBroker();
         CloudEvent cloudEvent = createDefaultCloudEvent();
         instance.putMessage(TEST_TOPIC, cloudEvent);
         instance.deleteTopicIfExist(TEST_TOPIC);
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
index bc11c9b0a..3ef86bdd2 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
@@ -18,14 +18,11 @@
 package org.apache.eventmesh.storage.standalone.broker.task;
 
 import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
-import static 
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 
 import org.apache.eventmesh.api.EventListener;
-import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
 
 import org.junit.jupiter.api.Assertions;
@@ -35,8 +32,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import io.cloudevents.CloudEvent;
-
 @ExtendWith(MockitoExtension.class)
 public class SubscribeTest {
 
@@ -48,12 +43,9 @@ public class SubscribeTest {
 
     @Test
     public void testSubscribe() {
-        CloudEvent cloudEvent = createDefaultCloudEvent();
-        
Mockito.when(standaloneBroker.getMessage(anyString())).thenReturn(cloudEvent);
-        Mockito.when(standaloneBroker.getMessage(anyString(), 
anyLong())).thenReturn(cloudEvent);
         subscribe = new Subscribe(TEST_TOPIC, standaloneBroker, eventListener);
         subscribe.subscribe();
-        Mockito.verify(eventListener).consume(any(CloudEvent.class), 
any(EventMeshAsyncConsumeContext.class));
+        Mockito.verify(standaloneBroker).subscribed(anyString(), 
any(Subscribe.class));
     }
 
     @Test
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
index 37cdc02c6..4bfee4976 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.eventmesh.storage.standalone.producer;
 
+import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
+
 import org.apache.eventmesh.api.SendResult;
 import org.apache.eventmesh.storage.standalone.TestUtils;
+import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
 
 import java.util.Properties;
 
@@ -28,10 +31,13 @@ import org.junit.jupiter.api.Test;
 
 import io.cloudevents.CloudEvent;
 
+
+
 public class StandaloneProducerTest {
 
     private StandaloneProducer standaloneProducer;
 
+
     @BeforeEach
     public void setUp() {
         standaloneProducer = new StandaloneProducer(new Properties());
@@ -61,6 +67,8 @@ public class StandaloneProducerTest {
 
     @Test
     public void testPublish() {
+        StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
+        standaloneBroker.createTopicIfAbsent(TEST_TOPIC);
         CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent();
         SendResult sendResult = standaloneProducer.publish(cloudEvent);
         Assertions.assertNotNull(sendResult);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to